diff --git a/CHANGELOG.md b/CHANGELOG.md index 79df58a19c0ae..fe90492f3d309 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -183,6 +183,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263)) - Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248)) - Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503)) +- Move ZStd to a plugin ([#9658](https://github.com/opensearch-project/OpenSearch/pull/9658)) - [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908)) ### Deprecated diff --git a/modules/reindex/build.gradle b/modules/reindex/build.gradle index cad7d67f3ef84..ea517ca53c000 100644 --- a/modules/reindex/build.gradle +++ b/modules/reindex/build.gradle @@ -69,6 +69,7 @@ dependencies { testImplementation project(':modules:transport-netty4') // for parent/child testing testImplementation project(':modules:parent-join') + testImplementation project(':plugins:custom-codecs') } restResources { @@ -95,4 +96,5 @@ forbiddenPatterns { tasks.named("bundlePlugin").configure { dependsOn("copyParentJoinMetadata") dependsOn("copyTransportNetty4Metadata") + dependsOn("copyCustomCodecsMetadata") } diff --git a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java index 7c2fe8d99c330..60afe44babc26 100644 --- a/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java +++ b/modules/reindex/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecReindexIT.java @@ -15,14 +15,19 @@ import org.opensearch.action.support.ActiveShardCount; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.customcodecs.CustomCodecPlugin; import org.opensearch.index.engine.Segment; import org.opensearch.index.reindex.BulkByScrollResponse; import org.opensearch.index.reindex.ReindexAction; +import org.opensearch.index.reindex.ReindexModulePlugin; import org.opensearch.index.reindex.ReindexRequestBuilder; import org.opensearch.index.reindex.ReindexTestCase; +import org.opensearch.plugins.Plugin; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; @@ -40,6 +45,11 @@ public class MultiCodecReindexIT extends ReindexTestCase { + @Override + protected Collection> nodePlugins() { + return List.of(CustomCodecPlugin.class, ReindexModulePlugin.class); + } + public void testReindexingMultipleCodecs() throws InterruptedException, ExecutionException { internalCluster().ensureAtLeastNumDataNodes(1); Map codecMap = Map.of( diff --git a/plugins/custom-codecs/build.gradle b/plugins/custom-codecs/build.gradle new file mode 100644 index 0000000000000..253822e88b817 --- /dev/null +++ b/plugins/custom-codecs/build.gradle @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.internal-cluster-test' + +opensearchplugin { + name 'custom-codecs' + description 'A plugin that implements custom compression codecs.' + classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin' + licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt') + noticeFile rootProject.file('NOTICE.txt') +} + +dependencies { + api "com.github.luben:zstd-jni:1.5.5-5" +} + +testingConventions.enabled = false; diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java b/plugins/custom-codecs/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java similarity index 89% rename from server/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java rename to plugins/custom-codecs/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java index 5f3e53f1454fc..9bef3e2d3d235 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java +++ b/plugins/custom-codecs/src/internalClusterTest/java/org/opensearch/index/codec/CodecCompressionLevelIT.java @@ -12,14 +12,24 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.customcodecs.CustomCodecPlugin; +import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ExecutionException; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class CodecCompressionLevelIT extends OpenSearchIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(CustomCodecPlugin.class); + } public void testLuceneCodecsCreateIndexWithCompressionLevel() { @@ -62,7 +72,7 @@ public void testZStandardCodecsCreateIndexWithCompressionLevel() { Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC)) + .put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC)) .put("index.codec.compression_level", randomIntBetween(1, 6)) .build() ); @@ -81,7 +91,7 @@ public void testZStandardToLuceneCodecsWithCompressionLevel() throws ExecutionEx Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC)) + .put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC)) .put("index.codec.compression_level", randomIntBetween(1, 6)) .build() ); @@ -164,7 +174,7 @@ public void testLuceneToZStandardCodecsWithCompressionLevel() throws ExecutionEx .updateSettings( new UpdateSettingsRequest(index).settings( Settings.builder() - .put("index.codec", randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC)) + .put("index.codec", randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC)) .put("index.codec.compression_level", randomIntBetween(1, 6)) ) ) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java b/plugins/custom-codecs/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java similarity index 95% rename from server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java rename to plugins/custom-codecs/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java index 23d5cc3b35486..82633513a2f50 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java +++ b/plugins/custom-codecs/src/internalClusterTest/java/org/opensearch/index/codec/MultiCodecMergeIT.java @@ -15,11 +15,15 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; +import org.opensearch.index.codec.customcodecs.CustomCodecPlugin; import org.opensearch.index.engine.Segment; +import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -40,6 +44,11 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) public class MultiCodecMergeIT extends OpenSearchIntegTestCase { + @Override + protected Collection> nodePlugins() { + return Collections.singletonList(CustomCodecPlugin.class); + } + public void testForceMergeMultipleCodecs() throws ExecutionException, InterruptedException { Map codecMap = Map.of( diff --git a/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java new file mode 100644 index 0000000000000..91a13a1d924a2 --- /dev/null +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.plugins.Plugin; + +import java.util.Optional; + +/** + * A plugin that implements custom codecs. Supports these codecs: + *
    + *
  • ZSTD + *
  • ZSTDNODICT + *
+ * + * @opensearch.internal + */ +public final class CustomCodecPlugin extends Plugin implements EnginePlugin { + + /** + * Creates a new instance + */ + public CustomCodecPlugin() {} + + /** + * @param indexSettings is the default indexSettings + * @return the engine factory + */ + @Override + public Optional getCustomCodecServiceFactory(final IndexSettings indexSettings) { + String codecName = indexSettings.getValue(EngineConfig.INDEX_CODEC_SETTING); + if (codecName.equals(CustomCodecService.ZSTD_NO_DICT_CODEC) || codecName.equals(CustomCodecService.ZSTD_CODEC)) { + return Optional.of(new CustomCodecServiceFactory()); + } + return Optional.empty(); + } +} diff --git a/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java new file mode 100644 index 0000000000000..de0eb2b3286d3 --- /dev/null +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java @@ -0,0 +1,72 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.opensearch.common.collect.MapBuilder; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.mapper.MapperService; + +import java.util.Arrays; +import java.util.Map; +import java.util.stream.Stream; + +import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; + +/** + * CustomCodecService provides ZSTD and ZSTD_NO_DICT compression codecs. + */ +public class CustomCodecService extends CodecService { + private final Map codecs; + /** + * ZStandard codec + */ + public static final String ZSTD_CODEC = "zstd"; + /** + * ZStandard without dictionary codec + */ + public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict"; + + /** + * Creates a new CustomCodecService. + * + * @param mapperService The mapper service. + * @param indexSettings The index settings. + * @param logger The logger. + */ + public CustomCodecService(MapperService mapperService, IndexSettings indexSettings, Logger logger) { + super(mapperService, indexSettings, logger); + int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING); + final MapBuilder codecs = MapBuilder.newMapBuilder(); + if (mapperService == null) { + codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel)); + codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel)); + } else { + codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel)); + codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel)); + } + this.codecs = codecs.immutableMap(); + } + + @Override + public Codec codec(String name) { + Codec codec = codecs.get(name); + if (codec == null) { + return super.codec(name); + } + return codec; + } + + @Override + public String[] availableCodecs() { + return Stream.concat(Arrays.stream(super.availableCodecs()), codecs.keySet().stream()).toArray(String[]::new); + } +} diff --git a/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java new file mode 100644 index 0000000000000..d634616162684 --- /dev/null +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; + +/** + * A factory for creating new {@link CodecService} instance + */ +public class CustomCodecServiceFactory implements CodecServiceFactory { + + /** Creates a new instance. */ + public CustomCodecServiceFactory() {} + + @Override + public CodecService createCodecService(CodecServiceConfig config) { + return new CustomCodecService(config.getMapperService(), config.getIndexSettings(), config.getLogger()); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java similarity index 54% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java index 8aa422a47a073..89acc980abd2f 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -15,6 +15,9 @@ import org.opensearch.index.codec.PerFieldMappingPostingFormatCodec; import org.opensearch.index.mapper.MapperService; +import java.util.Collections; +import java.util.Set; + /** * * Extends {@link FilterCodec} to reuse the functionality of Lucene Codec. @@ -23,12 +26,48 @@ * @opensearch.internal */ public abstract class Lucene95CustomCodec extends FilterCodec { + + /** Default compression level used for compression */ public static final int DEFAULT_COMPRESSION_LEVEL = 3; /** Each mode represents a compression algorithm. */ public enum Mode { - ZSTD, - ZSTD_NO_DICT + /** + * ZStandard mode with dictionary + */ + ZSTD("ZSTD", Set.of("zstd")), + /** + * ZStandard mode without dictionary + */ + ZSTD_NO_DICT("ZSTDNODICT", Set.of("zstd_no_dict")), + /** + * Deprecated ZStandard mode, added for backward compatibility to support indices created in 2.9.0 where + * both ZSTD and ZSTD_NO_DICT used Lucene95CustomCodec underneath. This should not be used to + * create new indices. + */ + ZSTD_DEPRECATED("Lucene95CustomCodec", Collections.emptySet()); + + private final String codec; + private final Set aliases; + + Mode(String codec, Set aliases) { + this.codec = codec; + this.aliases = aliases; + } + + /** + * Returns the Codec that is registered with Lucene + */ + public String getCodec() { + return codec; + } + + /** + * Returns the aliases of the Codec + */ + public Set getAliases() { + return aliases; + } } private final StoredFieldsFormat storedFieldsFormat; @@ -51,12 +90,22 @@ public Lucene95CustomCodec(Mode mode) { * @param compressionLevel The compression level. */ public Lucene95CustomCodec(Mode mode, int compressionLevel) { - super("Lucene95CustomCodec", new Lucene95Codec()); + super(mode.getCodec(), new Lucene95Codec()); this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); } + /** + * Creates a new compression codec with the given compression level. We use + * lowercase letters when registering the codec so that we remain consistent with + * the other compression codecs: default, lucene_default, and best_compression. + * + * @param mode The compression codec (ZSTD or ZSTDNODICT). + * @param compressionLevel The compression level. + * @param mapperService The mapper service. + * @param logger The logger. + */ public Lucene95CustomCodec(Mode mode, int compressionLevel, MapperService mapperService, Logger logger) { - super("Lucene95CustomCodec", new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger)); + super(mode.getCodec(), new PerFieldMappingPostingFormatCodec(Lucene95Codec.Mode.BEST_SPEED, mapperService, logger)); this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); } diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java similarity index 98% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java index 2816e2907a5f6..79d97035089ab 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java @@ -101,6 +101,7 @@ public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOCo StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { switch (mode) { case ZSTD: + case ZSTD_DEPRECATED: return new Lucene90CompressingStoredFieldsFormat( "CustomStoredFieldsZstd", zstdCompressionMode, @@ -125,6 +126,9 @@ Lucene95CustomCodec.Mode getMode() { return mode; } + /** + * Returns the compression level. + */ public int getCompressionLevel() { return compressionLevel; } diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java similarity index 79% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java index 042f7eaa29e53..a3e3a34a5d258 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java @@ -10,14 +10,17 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.settings.Setting; +import org.opensearch.index.codec.CodecAliases; import org.opensearch.index.codec.CodecSettings; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.mapper.MapperService; +import java.util.Set; + /** * ZstdCodec provides ZSTD compressor using the zstd-jni library. */ -public class ZstdCodec extends Lucene95CustomCodec implements CodecSettings { +public class ZstdCodec extends Lucene95CustomCodec implements CodecSettings, CodecAliases { /** * Creates a new ZstdCodec instance with the default compression level. @@ -35,6 +38,13 @@ public ZstdCodec(int compressionLevel) { super(Mode.ZSTD, compressionLevel); } + /** + * Creates a new ZstdCodec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + */ public ZstdCodec(MapperService mapperService, Logger logger, int compressionLevel) { super(Mode.ZSTD, compressionLevel, mapperService, logger); } @@ -49,4 +59,9 @@ public String toString() { public boolean supports(Setting setting) { return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING); } + + @Override + public Set aliases() { + return Mode.ZSTD.getAliases(); + } } diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java similarity index 100% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java diff --git a/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdDeprecatedCodec.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdDeprecatedCodec.java new file mode 100644 index 0000000000000..02fa386db97b3 --- /dev/null +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdDeprecatedCodec.java @@ -0,0 +1,61 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.Setting; +import org.opensearch.index.codec.CodecSettings; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.mapper.MapperService; + +/** + * ZstdDeprecatedCodec provides ZSTD compressor using the zstd-jni library. + * Added to support backward compatibility for indices created with Lucene95CustomCodec as codec name. + */ +@Deprecated(since = "2.10") +public class ZstdDeprecatedCodec extends Lucene95CustomCodec implements CodecSettings { + + /** + * Creates a new ZstdDefaultCodec instance with the default compression level. + */ + public ZstdDeprecatedCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdDefaultCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdDeprecatedCodec(int compressionLevel) { + super(Mode.ZSTD_DEPRECATED, compressionLevel); + } + + /** + * Creates a new ZstdDefaultCodec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + */ + public ZstdDeprecatedCodec(MapperService mapperService, Logger logger, int compressionLevel) { + super(Mode.ZSTD_DEPRECATED, compressionLevel, mapperService, logger); + } + + /** The name for this codec. */ + @Override + public String toString() { + return getClass().getSimpleName(); + } + + @Override + public boolean supports(Setting setting) { + return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java similarity index 78% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java index a7e8e0e42ee68..ea7351f755361 100644 --- a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java +++ b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java @@ -10,14 +10,17 @@ import org.apache.logging.log4j.Logger; import org.opensearch.common.settings.Setting; +import org.opensearch.index.codec.CodecAliases; import org.opensearch.index.codec.CodecSettings; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.mapper.MapperService; +import java.util.Set; + /** * ZstdNoDictCodec provides ZSTD compressor without a dictionary support. */ -public class ZstdNoDictCodec extends Lucene95CustomCodec implements CodecSettings { +public class ZstdNoDictCodec extends Lucene95CustomCodec implements CodecSettings, CodecAliases { /** * Creates a new ZstdNoDictCodec instance with the default compression level. @@ -35,6 +38,13 @@ public ZstdNoDictCodec(int compressionLevel) { super(Mode.ZSTD_NO_DICT, compressionLevel); } + /** + * Creates a new ZstdNoDictCodec instance. + * + * @param mapperService The mapper service. + * @param logger The logger. + * @param compressionLevel The compression level. + */ public ZstdNoDictCodec(MapperService mapperService, Logger logger, int compressionLevel) { super(Mode.ZSTD_NO_DICT, compressionLevel, mapperService, logger); } @@ -49,4 +59,9 @@ public String toString() { public boolean supports(Setting setting) { return setting.equals(EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING); } + + @Override + public Set aliases() { + return Mode.ZSTD_NO_DICT.getAliases(); + } } diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java similarity index 100% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java diff --git a/server/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java similarity index 100% rename from server/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java rename to plugins/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java diff --git a/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy b/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..8161010cfa897 --- /dev/null +++ b/plugins/custom-codecs/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +grant codeBase "${codebase.zstd-jni}" { + permission java.lang.RuntimePermission "loadLibrary.*"; +}; diff --git a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec similarity index 63% rename from server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec rename to plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec index 8b37d91cd8bc4..ba5054055d00a 100644 --- a/server/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec +++ b/plugins/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -1,2 +1,3 @@ org.opensearch.index.codec.customcodecs.ZstdCodec org.opensearch.index.codec.customcodecs.ZstdNoDictCodec +org.opensearch.index.codec.customcodecs.ZstdDeprecatedCodec diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java similarity index 100% rename from server/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java rename to plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java diff --git a/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/CustomCodecTests.java b/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/CustomCodecTests.java new file mode 100644 index 0000000000000..5365b9e222d9a --- /dev/null +++ b/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/CustomCodecTests.java @@ -0,0 +1,250 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.LogManager; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.lucene90.Lucene90StoredFieldsFormat; +import org.apache.lucene.codecs.lucene95.Lucene95Codec; +import org.apache.lucene.document.Document; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.SegmentReader; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.util.LuceneTestCase.SuppressCodecs; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.env.Environment; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.analysis.IndexAnalyzers; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.index.codec.CodecSettings; +import org.opensearch.index.engine.EngineConfig; +import org.opensearch.index.mapper.MapperService; +import org.opensearch.index.similarity.SimilarityService; +import org.opensearch.indices.mapper.MapperRegistry; +import org.opensearch.plugins.MapperPlugin; +import org.opensearch.test.IndexSettingsModule; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.util.Collections; +import java.util.Optional; + +import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_CODEC; +import static org.opensearch.index.codec.customcodecs.CustomCodecService.ZSTD_NO_DICT_CODEC; +import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; + +@SuppressCodecs("*") // we test against default codec so never get a random one here! +public class CustomCodecTests extends OpenSearchTestCase { + + private CustomCodecPlugin plugin; + + @Before + public void setup() { + plugin = new CustomCodecPlugin(); + } + + public void testZstd() throws Exception { + Codec codec = createCodecService(false).codec("zstd"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); + Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); + } + + public void testZstdNoDict() throws Exception { + Codec codec = createCodecService(false).codec("zstd_no_dict"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); + Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); + } + + public void testZstdDeprecatedCodec() { + final IllegalArgumentException e = expectThrows( + IllegalArgumentException.class, + () -> createCodecService(false).codec("ZSTD_DEPRECATED") + ); + assertTrue(e.getMessage().startsWith("failed to find codec")); + } + + public void testZstdWithCompressionLevel() throws Exception { + int randomCompressionLevel = randomIntBetween(1, 6); + Codec codec = createCodecService(randomCompressionLevel, "zstd").codec("zstd"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); + Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel()); + } + + public void testZstdNoDictWithCompressionLevel() throws Exception { + int randomCompressionLevel = randomIntBetween(1, 6); + Codec codec = createCodecService(randomCompressionLevel, "zstd_no_dict").codec("zstd_no_dict"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); + Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel()); + } + + public void testBestCompressionWithCompressionLevel() { + final Settings zstdSettings = Settings.builder() + .put(INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getKey(), randomIntBetween(1, 6)) + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomFrom(ZSTD_CODEC, ZSTD_NO_DICT_CODEC)) + .build(); + + // able to validate zstd + final IndexScopedSettings zstdIndexScopedSettings = new IndexScopedSettings( + zstdSettings, + IndexScopedSettings.BUILT_IN_INDEX_SETTINGS + ); + zstdIndexScopedSettings.validate(zstdSettings, true); + } + + public void testLuceneCodecsWithCompressionLevel() { + final Settings customCodecSettings = Settings.builder() + .put(INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getKey(), randomIntBetween(1, 6)) + .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomFrom("zstd", "zstd_no_dict")) + .build(); + + final IndexScopedSettings customCodecIndexScopedSettings = new IndexScopedSettings( + customCodecSettings, + IndexScopedSettings.BUILT_IN_INDEX_SETTINGS + ); + customCodecIndexScopedSettings.validate(customCodecSettings, true); + } + + public void testZstandardCompressionLevelSupport() throws Exception { + CodecService codecService = createCodecService(false); + CodecSettings zstdCodec = (CodecSettings) codecService.codec("zstd"); + CodecSettings zstdNoDictCodec = (CodecSettings) codecService.codec("zstd_no_dict"); + assertTrue(zstdCodec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)); + assertTrue(zstdNoDictCodec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)); + } + + public void testDefaultMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("default"); + assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_SPEED, codec); + } + + public void testBestCompressionMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("best_compression"); + assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_COMPRESSION, codec); + } + + public void testZstdMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("zstd"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); + Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); + } + + public void testZstdNoDictMapperServiceNull() throws Exception { + Codec codec = createCodecService(true).codec("zstd_no_dict"); + assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); + Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); + assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); + } + + // write some docs with it, inspect .si to see this was the used compression + private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Codec actual) throws Exception { + SegmentReader sr = getSegmentReader(actual); + String v = sr.getSegmentInfo().info.getAttribute(Lucene90StoredFieldsFormat.MODE_KEY); + assertNotNull(v); + assertEquals(expected, Lucene95Codec.Mode.valueOf(v)); + } + + private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec actual) throws Exception { + SegmentReader sr = getSegmentReader(actual); + String v = sr.getSegmentInfo().info.getAttribute(Lucene95CustomStoredFieldsFormat.MODE_KEY); + assertNotNull(v); + assertEquals(expected, Lucene95CustomCodec.Mode.valueOf(v)); + } + + private CodecService createCodecService(boolean isMapperServiceNull) throws IOException { + Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); + if (isMapperServiceNull) { + return new CustomCodecService(null, IndexSettingsModule.newIndexSettings("_na", nodeSettings), LogManager.getLogger("test")); + } + return buildCodecService(nodeSettings); + } + + private CodecService createCodecService(int randomCompressionLevel, String codec) throws IOException { + Settings nodeSettings = Settings.builder() + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) + .put("index.codec", codec) + .put("index.codec.compression_level", randomCompressionLevel) + .build(); + return buildCodecService(nodeSettings); + } + + private CodecService buildCodecService(Settings nodeSettings) throws IOException { + + IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("_na", nodeSettings); + SimilarityService similarityService = new SimilarityService(indexSettings, null, Collections.emptyMap()); + IndexAnalyzers indexAnalyzers = createTestAnalysis(indexSettings, nodeSettings).indexAnalyzers; + MapperRegistry mapperRegistry = new MapperRegistry(Collections.emptyMap(), Collections.emptyMap(), MapperPlugin.NOOP_FIELD_FILTER); + MapperService service = new MapperService( + indexSettings, + indexAnalyzers, + xContentRegistry(), + similarityService, + mapperRegistry, + () -> null, + () -> false, + null + ); + + Optional customCodecServiceFactory = plugin.getCustomCodecServiceFactory(indexSettings); + if (customCodecServiceFactory.isPresent()) { + return customCodecServiceFactory.get().createCodecService(new CodecServiceConfig(indexSettings, service, logger)); + } + return new CustomCodecService(service, indexSettings, LogManager.getLogger("test")); + } + + private SegmentReader getSegmentReader(Codec codec) throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(null); + iwc.setCodec(codec); + IndexWriter iw = new IndexWriter(dir, iwc); + iw.addDocument(new Document()); + iw.commit(); + iw.close(); + DirectoryReader ir = DirectoryReader.open(dir); + SegmentReader sr = (SegmentReader) ir.leaves().get(0).reader(); + ir.close(); + dir.close(); + return sr; + } + +} diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java b/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java similarity index 100% rename from server/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java rename to plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormatTests.java diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java b/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java similarity index 100% rename from server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java rename to plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java diff --git a/server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java b/plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java similarity index 100% rename from server/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java rename to plugins/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/index/codec/ZstdNotEnabledIT.java b/server/src/internalClusterTest/java/org/opensearch/index/codec/ZstdNotEnabledIT.java new file mode 100644 index 0000000000000..9b1fa77fc9a5a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/index/codec/ZstdNotEnabledIT.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.util.List; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST) +public class ZstdNotEnabledIT extends OpenSearchIntegTestCase { + + public void testZStdCodecsWithoutPluginInstalled() { + + internalCluster().startNode(); + final String index = "test-index"; + + // creating index with zstd and zstd_no_dict should fail if custom-codecs plugin is not installed + for (String codec : List.of("zstd", "zstd_no_dict")) { + assertThrows( + IllegalArgumentException.class, + () -> createIndex( + index, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.codec", codec) + .build() + ) + ); + } + } + +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/crypto/CryptoSettings.java b/server/src/main/java/org/opensearch/action/admin/cluster/crypto/CryptoSettings.java index 0d289e3552cdf..bd783b349bed4 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/crypto/CryptoSettings.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/crypto/CryptoSettings.java @@ -31,7 +31,6 @@ * @opensearch.internal */ public class CryptoSettings implements Writeable, ToXContentObject { - private String keyProviderName; private String keyProviderType; private Settings settings = EMPTY_SETTINGS; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/CryptoMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/CryptoMetadata.java index bf55ef0837ea9..27803cb106005 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/CryptoMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/CryptoMetadata.java @@ -27,6 +27,10 @@ * @opensearch.internal */ public class CryptoMetadata implements Writeable { + static final public String CRYPTO_METADATA_KEY = "crypto_metadata"; + static final public String KEY_PROVIDER_NAME_KEY = "key_provider_name"; + static final public String KEY_PROVIDER_TYPE_KEY = "key_provider_type"; + static final public String SETTINGS_KEY = "settings"; private final String keyProviderName; private final String keyProviderType; private final Settings settings; @@ -104,17 +108,17 @@ public static CryptoMetadata fromXContent(XContentParser parser) throws IOExcept while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String currentFieldName = parser.currentName(); - if ("key_provider_name".equals(currentFieldName)) { + if (KEY_PROVIDER_NAME_KEY.equals(currentFieldName)) { if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { throw new OpenSearchParseException("failed to parse crypto metadata [{}], unknown type"); } keyProviderName = parser.text(); - } else if ("key_provider_type".equals(currentFieldName)) { + } else if (KEY_PROVIDER_TYPE_KEY.equals(currentFieldName)) { if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { throw new OpenSearchParseException("failed to parse crypto metadata [{}], unknown type"); } keyProviderType = parser.text(); - } else if ("settings".equals(currentFieldName)) { + } else if (SETTINGS_KEY.equals(currentFieldName)) { if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException("failed to parse crypto metadata [{}], unknown type"); } @@ -130,10 +134,10 @@ public static CryptoMetadata fromXContent(XContentParser parser) throws IOExcept } public void toXContent(CryptoMetadata cryptoMetadata, XContentBuilder builder, ToXContent.Params params) throws IOException { - builder.startObject("crypto_metadata"); - builder.field("key_provider_name", cryptoMetadata.keyProviderName()); - builder.field("key_provider_type", cryptoMetadata.keyProviderType()); - builder.startObject("settings"); + builder.startObject(CRYPTO_METADATA_KEY); + builder.field(KEY_PROVIDER_NAME_KEY, cryptoMetadata.keyProviderName()); + builder.field(KEY_PROVIDER_TYPE_KEY, cryptoMetadata.keyProviderType()); + builder.startObject(SETTINGS_KEY); cryptoMetadata.settings().toXContent(builder, params); builder.endObject(); builder.endObject(); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java index 3cde53c3ae74b..1ab402e1bde4e 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/RepositoriesMetadata.java @@ -232,7 +232,7 @@ public static RepositoriesMetadata fromXContent(XContentParser parser) throws IO throw new OpenSearchParseException("failed to parse repository [{}], unknown type", name); } pendingGeneration = parser.longValue(); - } else if ("crypto_metadata".equals(currentFieldName)) { + } else if (CryptoMetadata.CRYPTO_METADATA_KEY.equals(currentFieldName)) { if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new OpenSearchParseException("failed to parse repository [{}], unknown type", name); } diff --git a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java index c7cfef5c5ce3d..5808f51f01efc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/transfer/RemoteTransferContainer.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.store.IndexInput; import org.opensearch.common.CheckedTriFunction; import org.opensearch.common.SetOnce; import org.opensearch.common.StreamContext; @@ -19,11 +21,13 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.common.blobstore.transfer.stream.ResettableCheckedInputStream; import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.util.ByteUtils; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.util.Objects; +import java.util.zip.CRC32; import com.jcraft.jzlib.JZlib; @@ -244,4 +248,17 @@ public void close() throws IOException { throw new IOException("Closure of some of the multi-part streams failed."); } } + + /** + * Compute final checksum for IndexInput container checksum footer added by {@link CodecUtil} + * @param indexInput IndexInput with checksum in footer + * @param checksumBytesLength length of checksum bytes + * @return final computed checksum of entire indexInput + */ + public static long checksumOfChecksum(IndexInput indexInput, int checksumBytesLength) throws IOException { + long storedChecksum = CodecUtil.retrieveChecksum(indexInput); + CRC32 checksumOfChecksum = new CRC32(); + checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); + return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), checksumBytesLength); + } } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 42e5ca15ab285..ca39dcd61dcc6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -10,7 +10,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.Nullable; @@ -22,6 +24,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.repositories.RepositoriesService; @@ -34,11 +38,14 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -57,6 +64,8 @@ public class RemoteClusterStateService implements Closeable { public static final String METADATA_MANIFEST_NAME_FORMAT = "%s"; + public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000; + public static final ChecksumBlobStoreFormat INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( "index-metadata", METADATA_NAME_FORMAT, @@ -130,24 +139,11 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro } ensureRepositorySet(); - final List allUploadedIndexMetadata = new ArrayList<>(); - // todo parallel upload // any validations before/after upload ? - for (IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { - // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 - final String indexMetadataKey = writeIndexMetadata( - clusterState.getClusterName().value(), - clusterState.getMetadata().clusterUUID(), - indexMetadata, - indexMetadataFileName(indexMetadata) - ); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataKey - ); - allUploadedIndexMetadata.add(uploadedIndexMetadata); - } + final List allUploadedIndexMetadata = writeIndexMetadataParallel( + clusterState, + new ArrayList<>(clusterState.metadata().indices().values()) + ); final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { @@ -197,6 +193,9 @@ public ClusterMetadataManifest writeIncrementalMetadata( final Map allUploadedIndexMetadata = previousManifest.getIndices() .stream() .collect(Collectors.toMap(UploadedIndexMetadata::getIndexName, Function.identity())); + + List toUpload = new ArrayList<>(); + for (final IndexMetadata indexMetadata : clusterState.metadata().indices().values()) { final Long previousVersion = previousStateIndexMetadataVersionByName.get(indexMetadata.getIndex().getName()); if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { @@ -207,32 +206,22 @@ public ClusterMetadataManifest writeIncrementalMetadata( indexMetadata.getVersion() ); numIndicesUpdated++; - final String indexMetadataKey = writeIndexMetadata( - clusterState.getClusterName().value(), - clusterState.getMetadata().clusterUUID(), - indexMetadata, - indexMetadataFileName(indexMetadata) - ); - final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata( - indexMetadata.getIndex().getName(), - indexMetadata.getIndexUUID(), - indexMetadataKey - ); - allUploadedIndexMetadata.put(indexMetadata.getIndex().getName(), uploadedIndexMetadata); + toUpload.add(indexMetadata); } else { numIndicesUnchanged++; } previousStateIndexMetadataVersionByName.remove(indexMetadata.getIndex().getName()); } + List uploadedIndexMetadataList = writeIndexMetadataParallel(clusterState, toUpload); + uploadedIndexMetadataList.forEach( + uploadedIndexMetadata -> allUploadedIndexMetadata.put(uploadedIndexMetadata.getIndexName(), uploadedIndexMetadata) + ); + for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) { allUploadedIndexMetadata.remove(removedIndexName); } - final ClusterMetadataManifest manifest = uploadManifest( - clusterState, - allUploadedIndexMetadata.values().stream().collect(Collectors.toList()), - false - ); + final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false); final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos); if (durationMillis >= slowWriteLoggingThreshold.getMillis()) { logger.warn( @@ -254,6 +243,118 @@ public ClusterMetadataManifest writeIncrementalMetadata( return manifest; } + /** + * Uploads provided IndexMetadata's to remote store in parallel. The call is blocking so the method waits for upload to finish and then return. + * @param clusterState current ClusterState + * @param toUpload list of IndexMetadata to upload + * @return {@code List} list of IndexMetadata uploaded to remote + * @throws IOException + */ + private List writeIndexMetadataParallel(ClusterState clusterState, List toUpload) + throws IOException { + List exceptionList = Collections.synchronizedList(new ArrayList<>(toUpload.size())); + final CountDownLatch latch = new CountDownLatch(toUpload.size()); + List result = new ArrayList<>(toUpload.size()); + + LatchedActionListener latchedActionListener = new LatchedActionListener<>( + ActionListener.wrap((UploadedIndexMetadata uploadedIndexMetadata) -> { + logger.trace( + String.format(Locale.ROOT, "IndexMetadata uploaded successfully for %s", uploadedIndexMetadata.getIndexName()) + ); + result.add(uploadedIndexMetadata); + }, ex -> { + assert ex instanceof IndexMetadataTransferException; + logger.error( + () -> new ParameterizedMessage("Exception during transfer of IndexMetadata to Remote {}", ex.getMessage()), + ex + ); + exceptionList.add(ex); + }), + latch + ); + + for (IndexMetadata indexMetadata : toUpload) { + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX/metadata_4_1690947200 + writeIndexMetadataAsync(clusterState, indexMetadata, latchedActionListener); + } + + try { + if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) { + IndexMetadataTransferException ex = new IndexMetadataTransferException( + String.format( + Locale.ROOT, + "Timed out waiting for transfer of index metadata to complete - %s", + toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + ) + ); + exceptionList.forEach(ex::addSuppressed); + throw ex; + } + } catch (InterruptedException ex) { + exceptionList.forEach(ex::addSuppressed); + IndexMetadataTransferException exception = new IndexMetadataTransferException( + String.format( + Locale.ROOT, + "Timed out waiting for transfer of index metadata to complete - %s", + toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + ), + ex + ); + Thread.currentThread().interrupt(); + throw exception; + } + if (exceptionList.size() > 0) { + IndexMetadataTransferException exception = new IndexMetadataTransferException( + String.format( + Locale.ROOT, + "Exception during transfer of IndexMetadata to Remote %s", + toUpload.stream().map(IndexMetadata::getIndex).map(Index::toString).collect(Collectors.joining("")) + ) + ); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + return result; + } + + /** + * Allows async Upload of IndexMetadata to remote + * @param clusterState current ClusterState + * @param indexMetadata {@link IndexMetadata} to upload + * @param latchedActionListener listener to respond back on after upload finishes + * @throws IOException + */ + private void writeIndexMetadataAsync( + ClusterState clusterState, + IndexMetadata indexMetadata, + LatchedActionListener latchedActionListener + ) throws IOException { + final BlobContainer indexMetadataContainer = indexMetadataContainer( + clusterState.getClusterName().value(), + clusterState.metadata().clusterUUID(), + indexMetadata.getIndexUUID() + ); + + ActionListener completionListener = ActionListener.wrap( + resp -> latchedActionListener.onResponse( + new UploadedIndexMetadata( + indexMetadata.getIndex().getName(), + indexMetadata.getIndexUUID(), + indexMetadataContainer.path().buildAsString() + indexMetadataFileName(indexMetadata) + ) + ), + ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex)) + ); + + INDEX_METADATA_FORMAT.writeAsync( + indexMetadata, + indexMetadataContainer, + indexMetadataFileName(indexMetadata), + blobStoreRepository.getCompressor(), + completionListener + ); + } + @Nullable public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterState, ClusterMetadataManifest previousManifest) throws IOException { @@ -313,14 +414,6 @@ private ClusterMetadataManifest uploadManifest( } } - private String writeIndexMetadata(String clusterName, String clusterUUID, IndexMetadata uploadIndexMetadata, String fileName) - throws IOException { - final BlobContainer indexMetadataContainer = indexMetadataContainer(clusterName, clusterUUID, uploadIndexMetadata.getIndexUUID()); - INDEX_METADATA_FORMAT.write(uploadIndexMetadata, indexMetadataContainer, fileName, blobStoreRepository.getCompressor()); - // returning full path - return indexMetadataContainer.path().buildAsString() + fileName; - } - private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { final BlobContainer metadataManifestContainer = manifestContainer(clusterName, clusterUUID); @@ -467,7 +560,22 @@ private ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String cluste } } + public static String encodeString(String content) { return Base64.getUrlEncoder().withoutPadding().encodeToString(content.getBytes(StandardCharsets.UTF_8)); + + + /** + * Exception for IndexMetadata transfer failures to remote + */ + static class IndexMetadataTransferException extends RuntimeException { + + public IndexMetadataTransferException(String errorDesc) { + super(errorDesc); + } + + public IndexMetadataTransferException(String errorDesc, Throwable cause) { + super(errorDesc, cause); + } } } diff --git a/server/src/main/java/org/opensearch/index/codec/CodecAliases.java b/server/src/main/java/org/opensearch/index/codec/CodecAliases.java new file mode 100644 index 0000000000000..066c092e86db8 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/codec/CodecAliases.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec; + +import org.apache.lucene.codecs.Codec; +import org.opensearch.common.annotation.ExperimentalApi; + +import java.util.Set; + +/** + * This {@link CodecAliases} to provide aliases for the {@link Codec}. + * + * @opensearch.internal + */ +@ExperimentalApi +public interface CodecAliases { + + /** + * Retrieves a set of aliases for an codec. + * + * @return A non-null set of alias strings. If no aliases are available, an empty set should be returned. + */ + default Set aliases() { + return Set.of(); + } +} diff --git a/server/src/main/java/org/opensearch/index/codec/CodecService.java b/server/src/main/java/org/opensearch/index/codec/CodecService.java index a1913d5fa2061..775fc88b504f5 100644 --- a/server/src/main/java/org/opensearch/index/codec/CodecService.java +++ b/server/src/main/java/org/opensearch/index/codec/CodecService.java @@ -39,14 +39,10 @@ import org.opensearch.common.Nullable; import org.opensearch.common.collect.MapBuilder; import org.opensearch.index.IndexSettings; -import org.opensearch.index.codec.customcodecs.ZstdCodec; -import org.opensearch.index.codec.customcodecs.ZstdNoDictCodec; import org.opensearch.index.mapper.MapperService; import java.util.Map; -import static org.opensearch.index.engine.EngineConfig.INDEX_CODEC_COMPRESSION_LEVEL_SETTING; - /** * Since Lucene 4.0 low level index segments are read and written through a * codec layer that allows to use use-case specific file formats & @@ -67,27 +63,20 @@ public class CodecService { * the raw unfiltered lucene default. useful for testing */ public static final String LUCENE_DEFAULT_CODEC = "lucene_default"; - public static final String ZSTD_CODEC = "zstd"; - public static final String ZSTD_NO_DICT_CODEC = "zstd_no_dict"; public CodecService(@Nullable MapperService mapperService, IndexSettings indexSettings, Logger logger) { final MapBuilder codecs = MapBuilder.newMapBuilder(); assert null != indexSettings; - int compressionLevel = indexSettings.getValue(INDEX_CODEC_COMPRESSION_LEVEL_SETTING); if (mapperService == null) { codecs.put(DEFAULT_CODEC, new Lucene95Codec()); codecs.put(LZ4, new Lucene95Codec()); codecs.put(BEST_COMPRESSION_CODEC, new Lucene95Codec(Mode.BEST_COMPRESSION)); codecs.put(ZLIB, new Lucene95Codec(Mode.BEST_COMPRESSION)); - codecs.put(ZSTD_CODEC, new ZstdCodec(compressionLevel)); - codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(compressionLevel)); } else { codecs.put(DEFAULT_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger)); codecs.put(LZ4, new PerFieldMappingPostingFormatCodec(Mode.BEST_SPEED, mapperService, logger)); codecs.put(BEST_COMPRESSION_CODEC, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger)); codecs.put(ZLIB, new PerFieldMappingPostingFormatCodec(Mode.BEST_COMPRESSION, mapperService, logger)); - codecs.put(ZSTD_CODEC, new ZstdCodec(mapperService, logger, compressionLevel)); - codecs.put(ZSTD_NO_DICT_CODEC, new ZstdNoDictCodec(mapperService, logger, compressionLevel)); } codecs.put(LUCENE_DEFAULT_CODEC, Codec.getDefault()); for (String codec : Codec.availableCodecs()) { diff --git a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java index 3351931a6b068..00f8ff6d3cd40 100644 --- a/server/src/main/java/org/opensearch/index/engine/EngineConfig.java +++ b/server/src/main/java/org/opensearch/index/engine/EngineConfig.java @@ -49,6 +49,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.CircuitBreakerService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.codec.CodecAliases; import org.opensearch.index.codec.CodecService; import org.opensearch.index.codec.CodecSettings; import org.opensearch.index.mapper.ParsedDocument; @@ -133,18 +134,26 @@ public Supplier retentionLeasesSupplier() { case "lz4": case "best_compression": case "zlib": - case "zstd": - case "zstd_no_dict": case "lucene_default": return s; default: - if (Codec.availableCodecs().contains(s) == false) { // we don't error message the not officially supported ones - throw new IllegalArgumentException( - "unknown value for [index.codec] must be one of [default, lz4, best_compression, zlib, zstd, zstd_no_dict] but was: " - + s - ); + if (Codec.availableCodecs().contains(s)) { + return s; } - return s; + + for (String codecName : Codec.availableCodecs()) { + Codec codec = Codec.forName(codecName); + if (codec instanceof CodecAliases) { + CodecAliases codecWithAlias = (CodecAliases) codec; + if (codecWithAlias.aliases().contains(s)) { + return s; + } + } + } + + throw new IllegalArgumentException( + "unknown value for [index.codec] must be one of [default, lz4, best_compression, zlib] but was: " + s + ); } }, Property.IndexScope, Property.NodeScope); @@ -181,9 +190,6 @@ public void validate(String key, Object value, Object dependency) { private static void doValidateCodecSettings(final String codec) { switch (codec) { - case "zstd": - case "zstd_no_dict": - return; case "best_compression": case "zlib": case "lucene_default": @@ -198,6 +204,18 @@ private static void doValidateCodecSettings(final String codec) { return; } } + for (String codecName : Codec.availableCodecs()) { + Codec availableCodec = Codec.forName(codecName); + if (availableCodec instanceof CodecAliases) { + CodecAliases availableCodecWithAlias = (CodecAliases) availableCodec; + if (availableCodecWithAlias.aliases().contains(codec)) { + if (availableCodec instanceof CodecSettings + && ((CodecSettings) availableCodec).supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)) { + return; + } + } + } + } } throw new IllegalArgumentException("Compression level cannot be set for the " + codec + " codec."); } @@ -238,6 +256,7 @@ private EngineConfig(Builder builder) { this.codecService = builder.codecService; this.eventListener = builder.eventListener; codecName = builder.indexSettings.getValue(INDEX_CODEC_SETTING); + // We need to make the indexing buffer for this shard at least as large // as the amount of memory that is available for all engines on the // local node so that decisions to flush segments to disk are made by diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0ad82d23e0cc4..3cd2fb231e284 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2336,6 +2336,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t } private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier, boolean syncFromRemote) throws IOException { + syncFromRemote = syncFromRemote && indexSettings.isRemoteSnapshot() == false; assert Thread.holdsLock(mutex) == false : "opening engine under mutex"; if (state != IndexShardState.RECOVERING) { throw new IndexShardNotRecoveringException(shardId, state); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 5250ce6230ffa..594b7f99cd85a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.apache.lucene.codecs.CodecUtil; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.Directory; import org.apache.lucene.store.IOContext; @@ -29,7 +28,6 @@ import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; -import org.opensearch.common.util.ByteUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.index.store.exception.ChecksumCombinationException; @@ -47,9 +45,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import java.util.zip.CRC32; -import com.jcraft.jzlib.JZlib; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; /** * A {@code RemoteDirectory} provides an abstraction layer for storing a list of files to a remote store. @@ -401,11 +398,8 @@ private void uploadBlob( private long calculateChecksumOfChecksum(Directory directory, String file) throws IOException { try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) { - long storedChecksum = CodecUtil.retrieveChecksum(indexInput); - CRC32 checksumOfChecksum = new CRC32(); - checksumOfChecksum.update(ByteUtils.toByteArrayBE(storedChecksum)); try { - return JZlib.crc32_combine(storedChecksum, checksumOfChecksum.getValue(), SEGMENT_CHECKSUM_BYTES); + return checksumOfChecksum(indexInput, SEGMENT_CHECKSUM_BYTES); } catch (Exception e) { throw new ChecksumCombinationException( "Potentially corrupted file: Checksum combination failed while combining stored checksum " diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index b514089966484..9cb171790dbe2 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -8,6 +8,7 @@ package org.opensearch.node.remotestore; +import org.opensearch.cluster.metadata.CryptoMetadata; import org.opensearch.cluster.metadata.RepositoriesMetadata; import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -36,6 +37,11 @@ public class RemoteStoreNodeAttribute { public static final String REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.segment.repository"; public static final String REMOTE_STORE_TRANSLOG_REPOSITORY_NAME_ATTRIBUTE_KEY = "remote_store.translog.repository"; public static final String REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s.type"; + public static final String REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT = "remote_store.repository.%s." + + CryptoMetadata.CRYPTO_METADATA_KEY; + public static final String REMOTE_STORE_REPOSITORY_CRYPTO_SETTINGS_PREFIX = REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT + + "." + + CryptoMetadata.SETTINGS_KEY; public static final String REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX = "remote_store.repository.%s.settings."; private final RepositoriesMetadata repositoriesMetadata; @@ -55,6 +61,34 @@ private String validateAttributeNonNull(DiscoveryNode node, String attributeKey) return attributeValue; } + private CryptoMetadata buildCryptoMetadata(DiscoveryNode node, String repositoryName) { + String metadataKey = String.format(Locale.getDefault(), REMOTE_STORE_REPOSITORY_CRYPTO_ATTRIBUTE_KEY_FORMAT, repositoryName); + boolean isRepoEncrypted = node.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(metadataKey)); + if (isRepoEncrypted == false) { + return null; + } + + String keyProviderName = validateAttributeNonNull(node, metadataKey + "." + CryptoMetadata.KEY_PROVIDER_NAME_KEY); + String keyProviderType = validateAttributeNonNull(node, metadataKey + "." + CryptoMetadata.KEY_PROVIDER_TYPE_KEY); + + String settingsAttributeKeyPrefix = String.format( + Locale.getDefault(), + REMOTE_STORE_REPOSITORY_CRYPTO_SETTINGS_PREFIX, + repositoryName + ); + + Map settingsMap = node.getAttributes() + .keySet() + .stream() + .filter(key -> key.startsWith(settingsAttributeKeyPrefix)) + .collect(Collectors.toMap(key -> key.replace(settingsAttributeKeyPrefix + ".", ""), key -> node.getAttributes().get(key))); + + Settings.Builder settings = Settings.builder(); + settingsMap.forEach(settings::put); + + return new CryptoMetadata(keyProviderName, keyProviderType, settings.build()); + } + private Map validateSettingsAttributesNonNull(DiscoveryNode node, String repositoryName) { String settingsAttributeKeyPrefix = String.format( Locale.getDefault(), @@ -86,10 +120,12 @@ private RepositoryMetadata buildRepositoryMetadata(DiscoveryNode node, String na Settings.Builder settings = Settings.builder(); settingsMap.forEach(settings::put); + CryptoMetadata cryptoMetadata = buildCryptoMetadata(node, name); + // Repository metadata built here will always be for a system repository. settings.put(BlobStoreRepository.SYSTEM_REPOSITORY_SETTING.getKey(), true); - return new RepositoryMetadata(name, type, settings.build()); + return new RepositoryMetadata(name, type, settings.build(), cryptoMetadata); } private RepositoriesMetadata buildRepositoriesMetadata(DiscoveryNode node) { diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 9048757405108..744e2fbd1bfc7 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -42,7 +42,11 @@ import org.apache.lucene.util.BytesRef; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.common.CheckedFunction; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; +import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lucene.store.ByteArrayIndexInput; @@ -50,6 +54,7 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.compress.Compressor; import org.opensearch.core.xcontent.MediaTypeRegistry; @@ -58,6 +63,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.gateway.CorruptStateException; +import org.opensearch.index.store.exception.ChecksumCombinationException; import org.opensearch.snapshots.SnapshotInfo; import java.io.IOException; @@ -67,6 +73,8 @@ import java.util.Locale; import java.util.Map; +import static org.opensearch.common.blobstore.transfer.RemoteTransferContainer.checksumOfChecksum; + /** * Snapshot metadata file format used in v2.0 and above * @@ -167,6 +175,61 @@ public void write(final T obj, final BlobContainer blobContainer, final String n blobContainer.writeBlob(blobName, bytes.streamInput(), bytes.length(), false); } + /** + * Writes blob with resolving the blob name using {@link #blobName} method. + * Leverages the multipart upload if supported by the blobContainer. + * + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param compressor whether to use compression + * @param listener listener to listen to write result + */ + public void writeAsync( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + ActionListener listener + ) throws IOException { + if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) { + write(obj, blobContainer, name, compressor); + listener.onResponse(null); + return; + } + final String blobName = blobName(name); + final BytesReference bytes = serialize(obj, blobName, compressor); + final String resourceDescription = "ChecksumBlobStoreFormat.writeAsync(blob=\"" + blobName + "\")"; + try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { + long expectedChecksum; + try { + expectedChecksum = checksumOfChecksum(input.clone(), 8); + } catch (Exception e) { + throw new ChecksumCombinationException( + "Potentially corrupted file: Checksum combination failed while combining stored checksum " + + "and calculated checksum of stored checksum", + resourceDescription, + e + ); + } + + try ( + RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( + blobName, + blobName, + bytes.length(), + true, + WritePriority.HIGH, + (size, position) -> new OffsetRangeIndexInputStream(input, size, position), + expectedChecksum, + true + ) + ) { + ((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(remoteTransferContainer.createWriteContext(), listener); + } + } + } + public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException { try (BytesStreamOutput outputStream = new BytesStreamOutput()) { try ( diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index d4e090b046760..1f4c32b59f183 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -15,14 +15,21 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.blobstore.support.PlainBlobMetadata; +import org.opensearch.common.blobstore.transfer.RemoteTransferContainer; import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.NamedXContentRegistry; @@ -47,12 +54,14 @@ import java.util.Map; import java.util.function.Supplier; -import org.mockito.ArgumentMatchers; +import org.mockito.ArgumentCaptor; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -146,6 +155,78 @@ public void testWriteFullMetadataSuccess() throws IOException { assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); } + public void testWriteFullMetadataInParallelSuccess() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onResponse(null); + return null; + }).when(container).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); + + final ClusterMetadataManifest manifest = remoteClusterStateService.writeFullMetadata(clusterState); + + final UploadedIndexMetadata uploadedIndexMetadata = new UploadedIndexMetadata("test-index", "index-uuid", "metadata-filename"); + List indices = List.of(uploadedIndexMetadata); + + final ClusterMetadataManifest expectedManifest = ClusterMetadataManifest.builder() + .indices(indices) + .clusterTerm(1L) + .stateVersion(1L) + .stateUUID("state-uuid") + .clusterUUID("cluster-uuid") + .build(); + + assertThat(manifest.getIndices().size(), is(1)); + assertThat(manifest.getIndices().get(0).getIndexName(), is(uploadedIndexMetadata.getIndexName())); + assertThat(manifest.getIndices().get(0).getIndexUUID(), is(uploadedIndexMetadata.getIndexUUID())); + assertThat(manifest.getIndices().get(0).getUploadedFilename(), notNullValue()); + assertThat(manifest.getClusterTerm(), is(expectedManifest.getClusterTerm())); + assertThat(manifest.getStateVersion(), is(expectedManifest.getStateVersion())); + assertThat(manifest.getClusterUUID(), is(expectedManifest.getClusterUUID())); + assertThat(manifest.getStateUUID(), is(expectedManifest.getStateUUID())); + + assertEquals(actionListenerArgumentCaptor.getAllValues().size(), 1); + assertEquals(writeContextArgumentCaptor.getAllValues().size(), 1); + + WriteContext capturedWriteContext = writeContextArgumentCaptor.getValue(); + byte[] writtenBytes = capturedWriteContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream().readAllBytes(); + IndexMetadata writtenIndexMetadata = RemoteClusterStateService.INDEX_METADATA_FORMAT.deserialize( + capturedWriteContext.getFileName(), + blobStoreRepository.getNamedXContentRegistry(), + new BytesArray(writtenBytes) + ); + + assertEquals(capturedWriteContext.getWritePriority(), WritePriority.HIGH); + assertEquals(writtenIndexMetadata.getNumberOfShards(), 1); + assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0); + assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index"); + assertEquals(writtenIndexMetadata.getIndex().getUUID(), "index-uuid"); + long expectedChecksum = RemoteTransferContainer.checksumOfChecksum(new ByteArrayIndexInput("metadata-filename", writtenBytes), 8); + assertEquals(capturedWriteContext.getExpectedChecksum().longValue(), expectedChecksum); + + } + + public void testWriteFullMetadataInParallelFailure() throws IOException { + final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build(); + AsyncMultiStreamBlobContainer container = (AsyncMultiStreamBlobContainer) mockBlobStoreObjects(AsyncMultiStreamBlobContainer.class); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + + doAnswer((i) -> { + actionListenerArgumentCaptor.getValue().onFailure(new RuntimeException("Cannot upload to remote")); + return null; + }).when(container).asyncBlobUpload(any(WriteContext.class), actionListenerArgumentCaptor.capture()); + + assertThrows( + RemoteClusterStateService.IndexMetadataTransferException.class, + () -> remoteClusterStateService.writeFullMetadata(clusterState) + ); + } + public void testFailWriteIncrementalMetadataNonClusterManagerNode() throws IOException { final ClusterState clusterState = generateClusterStateWithOneIndex().build(); final ClusterMetadataManifest manifest = remoteClusterStateService.writeIncrementalMetadata(clusterState, clusterState, null); @@ -426,13 +507,17 @@ public void testMarkLastStateAsCommittedSuccess() throws IOException { } private BlobContainer mockBlobStoreObjects() { + return mockBlobStoreObjects(BlobContainer.class); + } + + private BlobContainer mockBlobStoreObjects(Class blobContainerClazz) { final BlobPath blobPath = mock(BlobPath.class); when((blobStoreRepository.basePath())).thenReturn(blobPath); when(blobPath.add(anyString())).thenReturn(blobPath); when(blobPath.buildAsString()).thenReturn("/blob/path/"); - final BlobContainer blobContainer = mock(BlobContainer.class); + final BlobContainer blobContainer = mock(blobContainerClazz); when(blobContainer.path()).thenReturn(blobPath); - when(blobStore.blobContainer(ArgumentMatchers.any())).thenReturn(blobContainer); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); return blobContainer; } diff --git a/server/src/test/java/org/opensearch/index/codec/CodecTests.java b/server/src/test/java/org/opensearch/index/codec/CodecTests.java index 3ca759eaf0781..6f9ff78aa0143 100644 --- a/server/src/test/java/org/opensearch/index/codec/CodecTests.java +++ b/server/src/test/java/org/opensearch/index/codec/CodecTests.java @@ -48,8 +48,6 @@ import org.opensearch.env.Environment; import org.opensearch.index.IndexSettings; import org.opensearch.index.analysis.IndexAnalyzers; -import org.opensearch.index.codec.customcodecs.Lucene95CustomCodec; -import org.opensearch.index.codec.customcodecs.Lucene95CustomStoredFieldsFormat; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.similarity.SimilarityService; @@ -95,49 +93,7 @@ public void testZlib() throws Exception { assert codec instanceof PerFieldMappingPostingFormatCodec; } - public void testZstd() throws Exception { - Codec codec = createCodecService(false).codec("zstd"); - assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); - Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); - assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); - } - - public void testZstdNoDict() throws Exception { - Codec codec = createCodecService(false).codec("zstd_no_dict"); - assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); - Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); - assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); - } - - public void testZstdWithCompressionLevel() throws Exception { - int randomCompressionLevel = randomIntBetween(1, 6); - Codec codec = createCodecService(randomCompressionLevel, "zstd").codec("zstd"); - assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); - Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); - assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel()); - } - - public void testZstdNoDictWithCompressionLevel() throws Exception { - int randomCompressionLevel = randomIntBetween(1, 6); - Codec codec = createCodecService(randomCompressionLevel, "zstd_no_dict").codec("zstd_no_dict"); - assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); - Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); - assertEquals(randomCompressionLevel, storedFieldsFormat.getCompressionLevel()); - } - public void testBestCompressionWithCompressionLevel() { - final Settings zstdSettings = Settings.builder() - .put(INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getKey(), randomIntBetween(1, 6)) - .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomFrom(CodecService.ZSTD_CODEC, CodecService.ZSTD_NO_DICT_CODEC)) - .build(); - - // able to validate zstd - final IndexScopedSettings zstdIndexScopedSettings = new IndexScopedSettings( - zstdSettings, - IndexScopedSettings.BUILT_IN_INDEX_SETTINGS - ); - zstdIndexScopedSettings.validate(zstdSettings, true); - final Settings settings = Settings.builder() .put(INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getKey(), randomIntBetween(1, 6)) .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), randomFrom(CodecService.DEFAULT_CODEC, CodecService.BEST_COMPRESSION_CODEC)) @@ -152,17 +108,6 @@ public void testLuceneCodecsWithCompressionLevel() { String codecName = randomFrom(Codec.availableCodecs()); Codec codec = Codec.forName(codecName); - final Settings customCodecSettings = Settings.builder() - .put(INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getKey(), randomIntBetween(1, 6)) - .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), "Lucene95CustomCodec") - .build(); - - final IndexScopedSettings customCodecIndexScopedSettings = new IndexScopedSettings( - customCodecSettings, - IndexScopedSettings.BUILT_IN_INDEX_SETTINGS - ); - customCodecIndexScopedSettings.validate(customCodecSettings, true); - final Settings settings = Settings.builder() .put(INDEX_CODEC_COMPRESSION_LEVEL_SETTING.getKey(), randomIntBetween(1, 6)) .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName) @@ -178,14 +123,6 @@ public void testLuceneCodecsWithCompressionLevel() { } } - public void testZstandardCompressionLevelSupport() throws Exception { - CodecService codecService = createCodecService(false); - CodecSettings zstdCodec = (CodecSettings) codecService.codec("zstd"); - CodecSettings zstdNoDictCodec = (CodecSettings) codecService.codec("zstd_no_dict"); - assertTrue(zstdCodec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)); - assertTrue(zstdNoDictCodec.supports(INDEX_CODEC_COMPRESSION_LEVEL_SETTING)); - } - public void testDefaultMapperServiceNull() throws Exception { Codec codec = createCodecService(true).codec("default"); assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_SPEED, codec); @@ -196,20 +133,6 @@ public void testBestCompressionMapperServiceNull() throws Exception { assertStoredFieldsCompressionEquals(Lucene95Codec.Mode.BEST_COMPRESSION, codec); } - public void testZstdMapperServiceNull() throws Exception { - Codec codec = createCodecService(true).codec("zstd"); - assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD, codec); - Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); - assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); - } - - public void testZstdNoDictMapperServiceNull() throws Exception { - Codec codec = createCodecService(true).codec("zstd_no_dict"); - assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode.ZSTD_NO_DICT, codec); - Lucene95CustomStoredFieldsFormat storedFieldsFormat = (Lucene95CustomStoredFieldsFormat) codec.storedFieldsFormat(); - assertEquals(Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL, storedFieldsFormat.getCompressionLevel()); - } - public void testExceptionCodecNull() { assertThrows(IllegalArgumentException.class, () -> createCodecService(true).codec(null)); } @@ -226,13 +149,6 @@ private void assertStoredFieldsCompressionEquals(Lucene95Codec.Mode expected, Co assertEquals(expected, Lucene95Codec.Mode.valueOf(v)); } - private void assertStoredFieldsCompressionEquals(Lucene95CustomCodec.Mode expected, Codec actual) throws Exception { - SegmentReader sr = getSegmentReader(actual); - String v = sr.getSegmentInfo().info.getAttribute(Lucene95CustomStoredFieldsFormat.MODE_KEY); - assertNotNull(v); - assertEquals(expected, Lucene95CustomCodec.Mode.valueOf(v)); - } - private CodecService createCodecService(boolean isMapperServiceNull) throws IOException { Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); if (isMapperServiceNull) { @@ -241,15 +157,6 @@ private CodecService createCodecService(boolean isMapperServiceNull) throws IOEx return buildCodecService(nodeSettings); } - private CodecService createCodecService(int randomCompressionLevel, String codec) throws IOException { - Settings nodeSettings = Settings.builder() - .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()) - .put("index.codec", codec) - .put("index.codec.compression_level", randomCompressionLevel) - .build(); - return buildCodecService(nodeSettings); - } - private CodecService buildCodecService(Settings nodeSettings) throws IOException { IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("_na", nodeSettings); diff --git a/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java b/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java new file mode 100644 index 0000000000000..5206064273334 --- /dev/null +++ b/server/src/test/java/org/opensearch/node/RemoteStoreNodeAttributeTests.java @@ -0,0 +1,126 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.node; + +import org.opensearch.Version; +import org.opensearch.cluster.metadata.CryptoMetadata; +import org.opensearch.cluster.metadata.RepositoryMetadata; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; +import org.opensearch.test.OpenSearchTestCase; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.Map; + +import static java.util.Collections.emptySet; + +public class RemoteStoreNodeAttributeTests extends OpenSearchTestCase { + + static private final String KEY_ARN = "arn:aws:kms:us-east-1:123456789:key/6e9aa906-2cc3-4924-8ded-f385c78d9dcf"; + static private final String REGION = "us-east-1"; + + public void testCryptoMetadata() throws UnknownHostException { + Map attr = Map.of( + "remote_store.segment.repository", + "remote-store-A", + "remote_store.translog.repository", + "remote-store-A", + "remote_store.repository.remote-store-A.type", + "s3", + "remote_store.repository.remote-store-A.settings.bucket", + "abc", + "remote_store.repository.remote-store-A.settings.base_path", + "xyz", + "remote_store.repository.remote-store-A.crypto_metadata.key_provider_name", + "store-test", + "remote_store.repository.remote-store-A.crypto_metadata.key_provider_type", + "aws-kms", + "remote_store.repository.remote-store-A.crypto_metadata.settings.region", + REGION, + "remote_store.repository.remote-store-A.crypto_metadata.settings.key_arn", + KEY_ARN + ); + DiscoveryNode node = new DiscoveryNode( + "C", + new TransportAddress(InetAddress.getByName("localhost"), 9876), + attr, + emptySet(), + Version.CURRENT + ); + + RemoteStoreNodeAttribute remoteStoreNodeAttribute = new RemoteStoreNodeAttribute(node); + assertEquals(remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().size(), 1); + RepositoryMetadata repositoryMetadata = remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().get(0); + Settings.Builder settings = Settings.builder(); + settings.put("region", REGION); + settings.put("key_arn", KEY_ARN); + CryptoMetadata cryptoMetadata = new CryptoMetadata("store-test", "aws-kms", settings.build()); + assertEquals(cryptoMetadata, repositoryMetadata.cryptoMetadata()); + } + + public void testInvalidCryptoMetadata() throws UnknownHostException { + Map attr = Map.of( + "remote_store.segment.repository", + "remote-store-A", + "remote_store.translog.repository", + "remote-store-A", + "remote_store.repository.remote-store-A.type", + "s3", + "remote_store.repository.remote-store-A.settings.bucket", + "abc", + "remote_store.repository.remote-store-A.settings.base_path", + "xyz", + "remote_store.repository.remote-store-A.crypto_metadata.key_provider_name", + "store-test", + "remote_store.repository.remote-store-A.crypto_metadata.settings.region", + REGION, + "remote_store.repository.remote-store-A.crypto_metadata.settings.key_arn", + KEY_ARN + ); + DiscoveryNode node = new DiscoveryNode( + "C", + new TransportAddress(InetAddress.getByName("localhost"), 9876), + attr, + emptySet(), + Version.CURRENT + ); + + assertThrows(IllegalStateException.class, () -> new RemoteStoreNodeAttribute(node)); + } + + public void testNoCryptoMetadata() throws UnknownHostException { + Map attr = Map.of( + "remote_store.segment.repository", + "remote-store-A", + "remote_store.translog.repository", + "remote-store-A", + "remote_store.repository.remote-store-A.type", + "s3", + "remote_store.repository.remote-store-A.settings.bucket", + "abc", + "remote_store.repository.remote-store-A.settings.base_path", + "xyz" + ); + DiscoveryNode node = new DiscoveryNode( + "C", + new TransportAddress(InetAddress.getByName("localhost"), 9876), + attr, + emptySet(), + Version.CURRENT + ); + + RemoteStoreNodeAttribute remoteStoreNodeAttribute = new RemoteStoreNodeAttribute(node); + assertEquals(remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().size(), 1); + RepositoryMetadata repositoryMetadata = remoteStoreNodeAttribute.getRepositoriesMetadata().repositories().get(0); + assertNull(repositoryMetadata.cryptoMetadata()); + } +} diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index 93be194b2d112..03f0d27188027 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -34,14 +34,19 @@ import org.opensearch.OpenSearchCorruptionException; import org.opensearch.OpenSearchParseException; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; +import org.opensearch.common.blobstore.stream.read.ReadContext; +import org.opensearch.common.blobstore.stream.write.WriteContext; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.compress.CompressorRegistry; @@ -56,7 +61,9 @@ import java.io.EOFException; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; import java.util.Map; +import java.util.concurrent.CountDownLatch; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; @@ -114,6 +121,57 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par } } + public void testBlobStoreAsyncOperations() throws IOException, InterruptedException { + BlobStore blobStore = createTestBlobStore(); + MockFsVerifyingBlobContainer mockBlobContainer = new MockFsVerifyingBlobContainer( + (FsBlobStore) blobStore, + BlobPath.cleanPath(), + null + ); + ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); + + CountDownLatch latch = new CountDownLatch(2); + + ActionListener actionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("---> Async write succeeded"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.info("---> Failure in async write"); + throw new RuntimeException("async write should not fail"); + } + }; + + // Write blobs in different formats + checksumSMILE.writeAsync( + new BlobObj("checksum smile"), + mockBlobContainer, + "check-smile", + CompressorRegistry.none(), + actionListener + ); + checksumSMILE.writeAsync( + new BlobObj("checksum smile compressed"), + mockBlobContainer, + "check-smile-comp", + CompressorRegistry.getCompressor(DeflateCompressor.NAME), + actionListener + ); + + latch.await(); + + // Assert that all checksum blobs can be read + assertEquals(checksumSMILE.read(mockBlobContainer.getDelegate(), "check-smile", xContentRegistry()).getText(), "checksum smile"); + assertEquals( + checksumSMILE.read(mockBlobContainer.getDelegate(), "check-smile-comp", xContentRegistry()).getText(), + "checksum smile compressed" + ); + } + public void testBlobStoreOperations() throws IOException { BlobStore blobStore = createTestBlobStore(); BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); @@ -196,4 +254,35 @@ private long checksum(byte[] buffer) throws IOException { } } } + + public static class MockFsVerifyingBlobContainer extends FsBlobContainer implements AsyncMultiStreamBlobContainer { + + private BlobContainer delegate; + + public MockFsVerifyingBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) { + super(blobStore, blobPath, path); + delegate = blobStore.blobContainer(BlobPath.cleanPath()); + } + + @Override + public void asyncBlobUpload(WriteContext writeContext, ActionListener completionListener) throws IOException { + InputStream inputStream = writeContext.getStreamProvider(Integer.MAX_VALUE).provideStream(0).getInputStream(); + delegate.writeBlob(writeContext.getFileName(), inputStream, writeContext.getFileSize(), true); + completionListener.onResponse(null); + } + + @Override + public void readBlobAsync(String blobName, ActionListener listener) { + throw new RuntimeException("read not supported"); + } + + @Override + public boolean remoteIntegrityCheckSupported() { + return false; + } + + public BlobContainer getDelegate() { + return delegate; + } + } } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 3232ce7ddb87b..63fe5ec03b059 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -283,9 +283,7 @@ public abstract class OpenSearchIntegTestCase extends OpenSearchTestCase { CodecService.DEFAULT_CODEC, CodecService.LZ4, CodecService.BEST_COMPRESSION_CODEC, - CodecService.ZLIB, - CodecService.ZSTD_CODEC, - CodecService.ZSTD_NO_DICT_CODEC + CodecService.ZLIB ); /**