diff --git a/kaldb/pom.xml b/kaldb/pom.xml index d7d3198d94..a9da5f10ff 100644 --- a/kaldb/pom.xml +++ b/kaldb/pom.xml @@ -27,6 +27,7 @@ 2.15.1 9.5.0 2.7.0 + 1.5.5-5 5.5.0 2.20.0 2.20.116 @@ -188,6 +189,19 @@ ${opensearch.version} + + org.opensearch.plugin + lang-painless + ${opensearch.version} + + + + com.github.luben + zstd-jni + ${zstd.jni.version} + darwin_aarch64 + + org.apache.kafka diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java index 60335b9e68..716e21e608 100644 --- a/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/LuceneIndexStoreImpl.java @@ -1,5 +1,6 @@ package com.slack.kaldb.logstore; +import com.slack.kaldb.logstore.index.codec.ZstdCodec; import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl; import com.slack.kaldb.metadata.schema.LuceneFieldDef; import com.slack.kaldb.proto.config.KaldbConfigs; @@ -178,6 +179,7 @@ private IndexWriterConfig buildIndexWriterConfig( .setMergeScheduler(new KalDBMergeScheduler(metricsRegistry)) .setRAMBufferSizeMB(ramBufferSizeMb) .setUseCompoundFile(useCFSFiles) + .setCodec(new ZstdCodec()) // we sort by timestamp descending, as that is the order we expect to return results the // majority of the time .setIndexSort( diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/index/codec/ZstdCodec.java b/kaldb/src/main/java/com/slack/kaldb/logstore/index/codec/ZstdCodec.java new file mode 100644 index 0000000000..eabb1b80b9 --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/index/codec/ZstdCodec.java @@ -0,0 +1,76 @@ +package com.slack.kaldb.logstore.index.codec; + +import java.io.IOException; +import org.apache.lucene.codecs.FilterCodec; +import org.apache.lucene.codecs.StoredFieldsFormat; +import org.apache.lucene.codecs.StoredFieldsReader; +import org.apache.lucene.codecs.StoredFieldsWriter; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat; +import org.apache.lucene.codecs.lucene95.Lucene95Codec; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.SegmentInfo; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IOContext; + +// Inspired by https://github.com/opensearch-project/custom-codecs/ +// We can reuse that and remove this once custom-codecs has a release +// Update this when updating lucene versions +public final class ZstdCodec extends FilterCodec { + + private final ZstdStoredFieldsFormat storedFieldsFormat; + + public ZstdCodec() { + super("CustomCodec", new Lucene95Codec()); + this.storedFieldsFormat = new ZstdStoredFieldsFormat(); + } + + public StoredFieldsFormat storedFieldsFormat() { + return storedFieldsFormat; + } + + @Override + public String toString() { + return getClass().getSimpleName(); + } + + class ZstdStoredFieldsFormat extends StoredFieldsFormat { + + private static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024; + private static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096; + private static final int ZSTD_BLOCK_SHIFT = 10; + public static final int DEFAULT_COMPRESSION_LEVEL = 3; + + private final CompressionMode zstdCompressionMode; + + private ZstdStoredFieldsFormat() { + zstdCompressionMode = new ZstdCompressionMode(DEFAULT_COMPRESSION_LEVEL); + } + + @Override + public StoredFieldsReader fieldsReader( + Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException { + Lucene90CompressingStoredFieldsFormat impl = + new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstd", + zstdCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT); + return impl.fieldsReader(directory, si, fn, context); + } + + @Override + public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context) + throws IOException { + Lucene90CompressingStoredFieldsFormat impl = + new Lucene90CompressingStoredFieldsFormat( + "CustomStoredFieldsZstd", + zstdCompressionMode, + ZSTD_BLOCK_LENGTH, + ZSTD_MAX_DOCS_PER_BLOCK, + ZSTD_BLOCK_SHIFT); + return impl.fieldsWriter(directory, si, context); + } + } +} diff --git a/kaldb/src/main/java/com/slack/kaldb/logstore/index/codec/ZstdCompressionMode.java b/kaldb/src/main/java/com/slack/kaldb/logstore/index/codec/ZstdCompressionMode.java new file mode 100644 index 0000000000..88fa386cce --- /dev/null +++ b/kaldb/src/main/java/com/slack/kaldb/logstore/index/codec/ZstdCompressionMode.java @@ -0,0 +1,211 @@ +package com.slack.kaldb.logstore.index.codec; + +import com.github.luben.zstd.Zstd; +import com.github.luben.zstd.ZstdCompressCtx; +import com.github.luben.zstd.ZstdDecompressCtx; +import com.github.luben.zstd.ZstdDictCompress; +import com.github.luben.zstd.ZstdDictDecompress; +import java.io.IOException; +import org.apache.lucene.codecs.compressing.CompressionMode; +import org.apache.lucene.codecs.compressing.Compressor; +import org.apache.lucene.codecs.compressing.Decompressor; +import org.apache.lucene.store.ByteBuffersDataInput; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.ArrayUtil; +import org.apache.lucene.util.BytesRef; + +/** Zstandard Compression Mode */ +public class ZstdCompressionMode extends CompressionMode { + + private static final int NUM_SUB_BLOCKS = 10; + private static final int DICT_SIZE_FACTOR = 6; + private static final int DEFAULT_COMPRESSION_LEVEL = 6; + + private final int compressionLevel; + + /** default constructor */ + protected ZstdCompressionMode() { + this.compressionLevel = DEFAULT_COMPRESSION_LEVEL; + } + + /** + * Creates a new instance. + * + * @param compressionLevel The compression level to use. + */ + protected ZstdCompressionMode(int compressionLevel) { + this.compressionLevel = compressionLevel; + } + + /** Creates a new compressor instance. */ + @Override + public Compressor newCompressor() { + return new ZstdCompressor(compressionLevel); + } + + /** Creates a new decompressor instance. */ + @Override + public Decompressor newDecompressor() { + return new ZstdDecompressor(); + } + + /** zstandard compressor */ + private static final class ZstdCompressor extends Compressor { + + private final int compressionLevel; + private byte[] compressedBuffer; + + /** compressor with a given compresion level */ + public ZstdCompressor(int compressionLevel) { + this.compressionLevel = compressionLevel; + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable compress function*/ + private void doCompress( + byte[] bytes, int offset, int length, ZstdCompressCtx cctx, DataOutput out) + throws IOException { + if (length == 0) { + out.writeVInt(0); + return; + } + final int maxCompressedLength = (int) Zstd.compressBound(length); + compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, maxCompressedLength); + + int compressedSize = + cctx.compressByteArray( + compressedBuffer, 0, compressedBuffer.length, bytes, offset, length); + + out.writeVInt(compressedSize); + out.writeBytes(compressedBuffer, compressedSize); + } + + private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException { + assert offset >= 0 : "offset value must be greater than 0"; + + final int dictLength = length / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR); + final int blockLength = (length - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS; + out.writeVInt(dictLength); + out.writeVInt(blockLength); + + final int end = offset + length; + assert end >= 0 : "buffer read size must be greater than 0"; + + try (ZstdCompressCtx cctx = new ZstdCompressCtx()) { + cctx.setLevel(compressionLevel); + + // dictionary compression first + doCompress(bytes, offset, dictLength, cctx, out); + try (ZstdDictCompress dictCompress = + new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)) { + cctx.loadDict(dictCompress); + + for (int start = offset + dictLength; start < end; start += blockLength) { + int l = Math.min(blockLength, end - start); + doCompress(bytes, start, l, cctx, out); + } + } + } + } + + @Override + public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException { + final int length = (int) buffersInput.size(); + byte[] bytes = new byte[length]; + buffersInput.readBytes(bytes, 0, length); + compress(bytes, 0, length, out); + } + + @Override + public void close() throws IOException {} + } + + /** zstandard decompressor */ + private static final class ZstdDecompressor extends Decompressor { + + private byte[] compressedBuffer; + + /** default decompressor */ + public ZstdDecompressor() { + compressedBuffer = BytesRef.EMPTY_BYTES; + } + + /*resuable decompress function*/ + private void doDecompress( + DataInput in, ZstdDecompressCtx dctx, BytesRef bytes, int decompressedLen) + throws IOException { + final int compressedLength = in.readVInt(); + if (compressedLength == 0) { + return; + } + + compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, compressedLength); + in.readBytes(compressedBuffer, 0, compressedLength); + + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + decompressedLen); + int uncompressed = + dctx.decompressByteArray( + bytes.bytes, bytes.length, decompressedLen, compressedBuffer, 0, compressedLength); + + if (decompressedLen != uncompressed) { + throw new IllegalStateException(decompressedLen + " " + uncompressed); + } + bytes.length += uncompressed; + } + + @Override + public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes) + throws IOException { + assert offset + length <= originalLength : "buffer read size must be within limit"; + + if (length == 0) { + bytes.length = 0; + return; + } + final int dictLength = in.readVInt(); + final int blockLength = in.readVInt(); + bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, dictLength); + bytes.offset = bytes.length = 0; + + try (ZstdDecompressCtx dctx = new ZstdDecompressCtx()) { + + // decompress dictionary first + doDecompress(in, dctx, bytes, dictLength); + try (ZstdDictDecompress dictDecompress = + new ZstdDictDecompress(bytes.bytes, 0, dictLength)) { + dctx.loadDict(dictDecompress); + + int offsetInBlock = dictLength; + int offsetInBytesRef = offset; + + // Skip unneeded blocks + while (offsetInBlock + blockLength < offset) { + final int compressedLength = in.readVInt(); + in.skipBytes(compressedLength); + offsetInBlock += blockLength; + offsetInBytesRef -= blockLength; + } + + // Read blocks that intersect with the interval we need + while (offsetInBlock < offset + length) { + bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength); + int l = Math.min(blockLength, originalLength - offsetInBlock); + doDecompress(in, dctx, bytes, l); + offsetInBlock += blockLength; + } + + bytes.offset = offsetInBytesRef; + bytes.length = length; + + assert bytes.isValid() : "decompression output is corrupted"; + } + } + } + + @Override + public Decompressor clone() { + return new ZstdDecompressor(); + } + } +}