diff --git a/CHANGELOG.md b/CHANGELOG.md index 08ae27bb583fa..e0a1316eb8b2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Enable sort optimization for all NumericTypes ([#6464](https://github.com/opensearch-project/OpenSearch/pull/6464) - Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331)) - Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834)) +- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490)) @@ -104,6 +105,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `net.minidev:json-smart` from 2.4.7 to 2.4.10 - Bump `org.apache.maven:maven-model` from 3.6.2 to 3.9.1 - Bump `org.codehaus.jettison:jettison` from 1.5.3 to 1.5.4 ([#6878](https://github.com/opensearch-project/OpenSearch/pull/6878)) +- Add `com.github.luben:zstd-jni:1.5.4-1` ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577)) ### Changed - Require MediaType in Strings.toString API ([#6009](https://github.com/opensearch-project/OpenSearch/pull/6009)) diff --git a/sandbox/modules/custom-codecs/build.gradle b/sandbox/modules/custom-codecs/build.gradle new file mode 100644 index 0000000000000..bf1bc719b0ae6 --- /dev/null +++ b/sandbox/modules/custom-codecs/build.gradle @@ -0,0 +1,28 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.yaml-rest-test' + +opensearchplugin { + name 'custom-codecs' + description 'A plugin that implements custom compression codecs.' + classname 'org.opensearch.index.codec.customcodecs.CustomCodecPlugin' + licenseFile rootProject.file('licenses/APACHE-LICENSE-2.0.txt') + noticeFile rootProject.file('NOTICE.txt') +} + +dependencies { + api "com.github.luben:zstd-jni:1.5.4-1" +} + +yamlRestTest.enabled = false; +testingConventions.enabled = false; diff --git a/sandbox/modules/custom-codecs/licenses/zstd-jni-1.5.4-1.jar.sha1 b/sandbox/modules/custom-codecs/licenses/zstd-jni-1.5.4-1.jar.sha1 new file mode 100644 index 0000000000000..e95377f702a6c --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/zstd-jni-1.5.4-1.jar.sha1 @@ -0,0 +1 @@ +291ccaacc039e41932de877303edb6af98a91c24 diff --git a/sandbox/modules/custom-codecs/licenses/zstd-jni-LICENSE.txt b/sandbox/modules/custom-codecs/licenses/zstd-jni-LICENSE.txt new file mode 100644 index 0000000000000..c4dd507c1c72f --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/zstd-jni-LICENSE.txt @@ -0,0 +1,29 @@ +----------------------------------------------------------------------------- +** Beginning of "BSD License" text. ** + +Zstd-jni: JNI bindings to Zstd Library + +Copyright (c) 2015-present, Luben Karavelov/ All rights reserved. + +BSD License + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, this + list of conditions and the following disclaimer in the documentation and/or + other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/sandbox/modules/custom-codecs/licenses/zstd-jni-NOTICE.txt b/sandbox/modules/custom-codecs/licenses/zstd-jni-NOTICE.txt new file mode 100644 index 0000000000000..389c97cbc892d --- /dev/null +++ b/sandbox/modules/custom-codecs/licenses/zstd-jni-NOTICE.txt @@ -0,0 +1 @@ +The code for the JNI bindings to Zstd library was originally authored by Luben Karavelov diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java new file mode 100644 index 0000000000000..1e0245f3c8c6b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecPlugin.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.plugins.Plugin; +import org.opensearch.plugins.EnginePlugin; +import org.opensearch.index.codec.CodecServiceFactory; +import org.opensearch.index.IndexSettings; + +import java.util.Optional; + +/** + * A plugin that implements custom codecs. Supports these codecs: + * + * + * @opensearch.internal + */ +public final class CustomCodecPlugin extends Plugin implements EnginePlugin { + + /** Creates a new instance */ + public CustomCodecPlugin() {} + + /** + * @param indexSettings is the default indexSettings + * @return the engine factory + */ + @Override + public Optional getCustomCodecServiceFactory(final IndexSettings indexSettings) { + return Optional.of(new CustomCodecServiceFactory()); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java new file mode 100644 index 0000000000000..4dd25caa86d94 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecService.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.codecs.Codec; +import org.opensearch.common.collect.MapBuilder; +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.mapper.MapperService; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Map; + +/** + * CustomCodecService provides ZSTD and ZSTDNODICT compression codecs. + */ +public class CustomCodecService extends CodecService { + private final Map codecs; + + /** + * Creates a new CustomCodecService. + * + * @param mapperService A mapper service. + * @param logger A logger. + */ + public CustomCodecService(MapperService mapperService, Logger logger) { + super(mapperService, logger); + final MapBuilder codecs = MapBuilder.newMapBuilder(); + if (mapperService == null) { + codecs.put(Lucene95CustomCodec.Mode.ZSTD.name(), new ZstdCodec()); + codecs.put(Lucene95CustomCodec.Mode.ZSTDNODICT.name(), new ZstdNoDictCodec()); + } else { + codecs.put( + Lucene95CustomCodec.Mode.ZSTD.name(), + new PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode.ZSTD, mapperService) + ); + codecs.put( + Lucene95CustomCodec.Mode.ZSTDNODICT.name(), + new PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode.ZSTDNODICT, mapperService) + ); + } + this.codecs = codecs.immutableMap(); + } + + @Override + public Codec codec(String name) { + Codec codec = codecs.get(name); + if (codec == null) { + return super.codec(name); + } + return codec; + } + + @Override + public String[] availableCodecs() { + ArrayList ac = new ArrayList(Arrays.asList(super.availableCodecs())); + ac.addAll(codecs.keySet()); + return ac.toArray(new String[0]); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java new file mode 100644 index 0000000000000..9a1872abfcbd7 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/CustomCodecServiceFactory.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.codec.CodecService; +import org.opensearch.index.codec.CodecServiceConfig; +import org.opensearch.index.codec.CodecServiceFactory; + +/** + * A factory for creating new {@link CodecService} instance + */ +public class CustomCodecServiceFactory implements CodecServiceFactory { + + /** Creates a new instance. */ + public CustomCodecServiceFactory() {} + + @Override + public CodecService createCodecService(CodecServiceConfig config) { + return new CustomCodecService(config.getMapperService(), config.getLogger()); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java new file mode 100644 index 0000000000000..652306e59559b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomCodec.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.lucene95.Lucene95Codec; + +abstract class Lucene95CustomCodec extends FilterCodec { + public static final int DEFAULT_COMPRESSION_LEVEL = 6; + + /** Each mode represents a compression algorithm. */ + public enum Mode { + ZSTD, + ZSTDNODICT + } + + private final StoredFieldsFormat storedFieldsFormat; + + /** new codec for a given compression algorithm and default compression level */ + public Lucene95CustomCodec(Mode mode) { + this(mode, DEFAULT_COMPRESSION_LEVEL); + } + + public Lucene95CustomCodec(Mode mode, int compressionLevel) { + super(mode.name(), new Lucene95Codec()); + this.storedFieldsFormat = new Lucene95CustomStoredFieldsFormat(mode, compressionLevel); + } + + @Override + public StoredFieldsFormat storedFieldsFormat() { + return storedFieldsFormat; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java new file mode 100644 index 0000000000000..e0253516b6d0a --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/Lucene95CustomStoredFieldsFormat.java @@ -0,0 +1,107 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import java.io.IOException; +import java.util.Objects; +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.StoredFieldsWriter; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +/** Stored field format used by pluggable codec */ +public class Lucene95CustomStoredFieldsFormat extends StoredFieldsFormat { + + /** A key that we use to map to a mode */ + public static final String MODE_KEY = Lucene95CustomStoredFieldsFormat.class.getSimpleName() + ".mode"; + + private static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024; + private static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096; + private static final int ZSTD_BLOCK_SHIFT = 10; + + private final CompressionMode zstdCompressionMode; + private final CompressionMode zstdNoDictCompressionMode; + + private final Lucene95CustomCodec.Mode mode; + + /** default constructor */ + public Lucene95CustomStoredFieldsFormat() { + this(Lucene95CustomCodec.Mode.ZSTD, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance. + * + * @param mode The mode represents ZSTD or ZSTDNODICT + */ + public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode) { + this(mode, Lucene95CustomCodec.DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new instance with the specified mode and compression level. + * + * @param mode The mode represents ZSTD or ZSTDNODICT + * @param compressionLevel The compression level for the mode. + */ + public Lucene95CustomStoredFieldsFormat(Lucene95CustomCodec.Mode mode, int compressionLevel) { + this.mode = Objects.requireNonNull(mode); + zstdCompressionMode = new ZstdCompressionMode(compressionLevel); + zstdNoDictCompressionMode = new ZstdNoDictCompressionMode(compressionLevel); + } + + @Override + public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { + String value = si.getAttribute(MODE_KEY); + if (value == null) { + throw new IllegalStateException("missing value for " + MODE_KEY + " for segment: " + si.name); + } + Lucene95CustomCodec.Mode mode = Lucene95CustomCodec.Mode.valueOf(value); + return impl(mode).fieldsReader(directory, si, fn, context); + } + + @Override + public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) throws IOException { + String previous = si.putAttribute(MODE_KEY, mode.name()); + if (previous != null && previous.equals(mode.name()) == false) { + throw new IllegalStateException( + "found existing value for " + MODE_KEY + " for segment: " + si.name + " old = " + previous + ", new = " + mode.name() + ); + } + return impl(mode).fieldsWriter(directory, si, context); + } + + private StoredFieldsFormat impl(Lucene95CustomCodec.Mode mode) { + switch (mode) { + case ZSTD: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstd", + zstdCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT + ); + case ZSTDNODICT: + return new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstdNoDict", + zstdNoDictCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT + ); + default: + throw new AssertionError(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java new file mode 100644 index 0000000000000..f1c64853bca40 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/PerFieldMappingPostingFormatCodec.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.opensearch.index.mapper.MapperService; + +/** PerFieldMappingPostingFormatCodec. {@link org.opensearch.index.codec.PerFieldMappingPostingFormatCodec} */ +public class PerFieldMappingPostingFormatCodec extends Lucene95CustomCodec { + + /** + * Creates a new instance. + * + * @param compressionMode The compression mode (ZSTD or ZSTDNODICT). + * @param mapperService The mapper service. + */ + public PerFieldMappingPostingFormatCodec(Lucene95CustomCodec.Mode compressionMode, MapperService mapperService) { + super(compressionMode); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java new file mode 100644 index 0000000000000..086e2461b1f6a --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCodec.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +/** + * ZstdCodec provides ZSTD compressor using the zstd-jni library. + */ +public class ZstdCodec extends Lucene95CustomCodec { + + /** + * Creates a new ZstdCodec instance with the default compression level. + */ + public ZstdCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdCodec(int compressionLevel) { + super(Mode.ZSTD, compressionLevel); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java new file mode 100644 index 0000000000000..795ddf3ab2d17 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdCompressionMode.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdDecompressCtx; +import com.github.luben.zstd.ZstdDictCompress; +import com.github.luben.zstd.ZstdDictDecompress; +import java.io.IOException; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +/** Zstandard Compression Mode */ +public class ZstdCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DICT_SIZE_FACTOR = 6; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance. + * + * @param compressionLevel The compression level to use. + */ + protected ZstdCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable compress function*/ + private void doCompress(byte[] bytes, int offset, int length, ZstdCompressCtx cctx, DataOutput out) throws IOException { + if (length == 0) { + out.writeVInt(0); + return; + } + final int maxCompressedLength = (int) Zstd.compressBound(length); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = cctx.compressByteArray(compressedBuffer, 0, compressedBuffer.length, bytes, offset, length); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + final int dictLength = length / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR); + final int blockLength = (length - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(dictLength); + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + cctx.setLevel(compressionLevel); + + // dictionary compression first + doCompress(bytes, offset, dictLength, cctx, out); + cctx.loadDict(new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)); + + for (int start = offset + dictLength; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + doCompress(bytes, start, l, cctx, out); + } + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressedBuffer; + + /** default decompressor */ + public ZstdDecompressor() { + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable decompress function*/ + private void doDecompress(DataInput in, ZstdDecompressCtx dctx, BytesRef bytes, int decompressedLen) throws IOException { + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + + compressedBuffer = ArrayUtil.grow(compressedBuffer, compressedLength); + in.readBytes(compressedBuffer, 0, compressedLength); + + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + decompressedLen); + int uncompressed = dctx.decompressByteArray(bytes.bytes, bytes.length, decompressedLen, compressedBuffer, 0, compressedLength); + + if (decompressedLen != uncompressed) { + throw new IllegalStateException(decompressedLen + " " + uncompressed); + } + bytes.length += uncompressed; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + final int dictLength = in.readVInt(); + final int blockLength = in.readVInt(); + bytes.bytes = ArrayUtil.grow(bytes.bytes, dictLength); + bytes.offset = bytes.length = 0; + + try (ZstdDecompressCtx dctx = new ZstdDecompressCtx()) { + + // decompress dictionary first + doDecompress(in, dctx, bytes, dictLength); + + dctx.loadDict(new ZstdDictDecompress(bytes.bytes, 0, dictLength)); + + int offsetInBlock = dictLength; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + int l = Math.min(blockLength, originalLength - offsetInBlock); + doDecompress(in, dctx, bytes, l); + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted"; + } + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java new file mode 100644 index 0000000000000..c33ca1f4ff6e7 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCodec.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +/** + * ZstdNoDictCodec provides ZSTD compressor without a dictionary support. + */ +public class ZstdNoDictCodec extends Lucene95CustomCodec { + + /** + * Creates a new ZstdNoDictCodec instance with the default compression level. + */ + public ZstdNoDictCodec() { + this(DEFAULT_COMPRESSION_LEVEL); + } + + /** + * Creates a new ZstdNoDictCodec instance. + * + * @param compressionLevel The compression level. + */ + public ZstdNoDictCodec(int compressionLevel) { + super(Mode.ZSTDNODICT, compressionLevel); + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java new file mode 100644 index 0000000000000..61808191556f0 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressionMode.java @@ -0,0 +1,178 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import com.github.luben.zstd.Zstd; +import java.io.IOException; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +/** ZSTD Compression Mode (without a dictionary support). */ +public class ZstdNoDictCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdNoDictCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance with the given compression level. + * + * @param compressionLevel The compression level. + */ + protected ZstdNoDictCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + int blockLength = (length + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + for (int start = offset; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + + if (l == 0) { + out.writeVInt(0); + return; + } + + final int maxCompressedLength = (int) Zstd.compressBound(l); + compressedBuffer = ArrayUtil.grow(compressedBuffer, maxCompressedLength); + + int compressedSize = (int) Zstd.compressByteArray( + compressedBuffer, + 0, + compressedBuffer.length, + bytes, + start, + l, + compressionLevel + ); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressed; + + /** default decompressor */ + public ZstdDecompressor() { + compressed = BytesRef.EMPTY_BYTES; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + + final int blockLength = in.readVInt(); + bytes.offset = bytes.length = 0; + int offsetInBlock = 0; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + compressed = ArrayUtil.grow(compressed, compressedLength); + in.readBytes(compressed, 0, compressedLength); + + int l = Math.min(blockLength, originalLength - offsetInBlock); + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + l); + + byte[] output = new byte[l]; + + final int uncompressed = (int) Zstd.decompressByteArray(output, 0, l, compressed, 0, compressedLength); + System.arraycopy(output, 0, bytes.bytes, bytes.length, uncompressed); + + bytes.length += uncompressed; + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted."; + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +} diff --git a/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java new file mode 100644 index 0000000000000..e996873963b1b --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/java/org/opensearch/index/codec/customcodecs/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * A plugin that implements compression codecs with native implementation. + */ +package org.opensearch.index.codec.customcodecs; diff --git a/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy b/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..8161010cfa897 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +grant codeBase "${codebase.zstd-jni}" { + permission java.lang.RuntimePermission "loadLibrary.*"; +}; diff --git a/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec new file mode 100644 index 0000000000000..8b37d91cd8bc4 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec @@ -0,0 +1,2 @@ +org.opensearch.index.codec.customcodecs.ZstdCodec +org.opensearch.index.codec.customcodecs.ZstdNoDictCodec diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java new file mode 100644 index 0000000000000..fcfb06ca6b050 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/AbstractCompressorTests.java @@ -0,0 +1,219 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.tests.util.LineFileDocs; +import org.apache.lucene.tests.util.TestUtil; +import org.opensearch.test.OpenSearchTestCase; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteArrayDataInput; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.ByteBuffersDataOutput; +import org.apache.lucene.util.BytesRef; + +import java.util.List; +import java.nio.ByteBuffer; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Random; + +/** + * Test cases for compressors (based on {@See org.opensearch.common.compress.DeflateCompressTests}). + */ +public abstract class AbstractCompressorTests extends OpenSearchTestCase { + + abstract Compressor compressor(); + + abstract Decompressor decompressor(); + + public void testEmpty() throws IOException { + final byte[] bytes = "".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testShortLiterals() throws IOException { + final byte[] bytes = "1234567345673456745608910123".getBytes(StandardCharsets.UTF_8); + doTest(bytes); + } + + public void testRandom() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + final byte[] bytes = new byte[TestUtil.nextInt(r, 1, 100000)]; + r.nextBytes(bytes); + doTest(bytes); + } + } + + public void testLineDocs() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 10; i++) { + int numDocs = TestUtil.nextInt(r, 1, 200); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + for (int j = 0; j < numDocs; j++) { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + doTest(bos.toByteArray()); + } + lineFileDocs.close(); + } + + public void testRepetitionsL() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numLongs = TestUtil.nextInt(r, 1, 10000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + long theValue = r.nextLong(); + for (int j = 0; j < numLongs; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsI() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numInts = TestUtil.nextInt(r, 1, 20000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int theValue = r.nextInt(); + for (int j = 0; j < numInts; j++) { + if (r.nextInt(10) == 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testRepetitionsS() throws IOException { + Random r = random(); + for (int i = 0; i < 10; i++) { + int numShorts = TestUtil.nextInt(r, 1, 40000); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + short theValue = (short) r.nextInt(65535); + for (int j = 0; j < numShorts; j++) { + if (r.nextInt(10) == 0) { + theValue = (short) r.nextInt(65535); + } + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + doTest(bos.toByteArray()); + } + } + + public void testMixed() throws IOException { + Random r = random(); + LineFileDocs lineFileDocs = new LineFileDocs(r); + for (int i = 0; i < 2; ++i) { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + int prevInt = r.nextInt(); + long prevLong = r.nextLong(); + while (bos.size() < 400000) { + switch (r.nextInt(4)) { + case 0: + addInt(r, prevInt, bos); + break; + case 1: + addLong(r, prevLong, bos); + break; + case 2: + addString(lineFileDocs, bos); + break; + case 3: + addBytes(r, bos); + break; + default: + throw new IllegalStateException("Random is broken"); + } + } + doTest(bos.toByteArray()); + } + } + + private void addLong(Random r, long prev, ByteArrayOutputStream bos) { + long theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextLong(); + } + bos.write((byte) (theValue >>> 56)); + bos.write((byte) (theValue >>> 48)); + bos.write((byte) (theValue >>> 40)); + bos.write((byte) (theValue >>> 32)); + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addInt(Random r, int prev, ByteArrayOutputStream bos) { + int theValue = prev; + if (r.nextInt(10) != 0) { + theValue = r.nextInt(); + } + bos.write((byte) (theValue >>> 24)); + bos.write((byte) (theValue >>> 16)); + bos.write((byte) (theValue >>> 8)); + bos.write((byte) theValue); + } + + private void addString(LineFileDocs lineFileDocs, ByteArrayOutputStream bos) throws IOException { + String s = lineFileDocs.nextDoc().get("body"); + bos.write(s.getBytes(StandardCharsets.UTF_8)); + } + + private void addBytes(Random r, ByteArrayOutputStream bos) throws IOException { + byte bytes[] = new byte[TestUtil.nextInt(r, 1, 10000)]; + r.nextBytes(bytes); + bos.write(bytes); + } + + private void doTest(byte[] bytes) throws IOException { + final int length = bytes.length; + + ByteBuffersDataInput in = new ByteBuffersDataInput(List.of(ByteBuffer.wrap(bytes))); + ByteBuffersDataOutput out = new ByteBuffersDataOutput(); + + // let's compress + Compressor compressor = compressor(); + compressor.compress(in, out); + byte[] compressed = out.toArrayCopy(); + + // let's decompress + BytesRef outbytes = new BytesRef(); + Decompressor decompressor = decompressor(); + decompressor.decompress(new ByteArrayDataInput(compressed), length, 0, length, outbytes); + + // get the uncompressed array out of outbytes + byte[] restored = new byte[outbytes.length]; + System.arraycopy(outbytes.bytes, 0, restored, 0, outbytes.length); + + assertArrayEquals(bytes, restored); + } + +} diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java new file mode 100644 index 0000000000000..78cf62c08f889 --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdCompressorTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test ZSTD compression (with dictionary enabled) + */ +public class ZstdCompressorTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdCompressionMode().newCompressor(); + private final Decompressor decompressor = new ZstdCompressionMode().newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +} diff --git a/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java new file mode 100644 index 0000000000000..2eda81a6af2ab --- /dev/null +++ b/sandbox/modules/custom-codecs/src/test/java/org/opensearch/index/codec/customcodecs/ZstdNoDictCompressorTests.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.index.codec.customcodecs; + +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; + +/** + * Test ZSTD compression (with no dictionary). + */ +public class ZstdNoDictCompressorTests extends AbstractCompressorTests { + + private final Compressor compressor = new ZstdNoDictCompressionMode().newCompressor(); + private final Decompressor decompressor = new ZstdNoDictCompressionMode().newDecompressor(); + + @Override + Compressor compressor() { + return compressor; + } + + @Override + Decompressor decompressor() { + return decompressor; + } +}