Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ZSTD codec #679

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions kaldb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<jackson.databind.version>2.15.1</jackson.databind.version>
<lucene.version>9.5.0</lucene.version>
<opensearch.version>2.7.0</opensearch.version>
<zstd.jni.version>1.5.5-5</zstd.jni.version>
<curator.version>5.5.0</curator.version>
<log4j.version>2.20.0</log4j.version>
<aws.sdk.version>2.20.116</aws.sdk.version>
Expand Down Expand Up @@ -188,6 +189,19 @@
<version>${opensearch.version}</version>
</dependency>

<dependency>
<groupId>org.opensearch.plugin</groupId>
<artifactId>lang-painless</artifactId>
<version>${opensearch.version}</version>
</dependency>

<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>${zstd.jni.version}</version>
<classifier>darwin_aarch64</classifier>
</dependency>

<!-- Kafka writer dependencies -->
<dependency>
<groupId>org.apache.kafka</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.slack.kaldb.logstore;

import com.slack.kaldb.logstore.index.codec.ZstdCodec;
import com.slack.kaldb.logstore.schema.SchemaAwareLogDocumentBuilderImpl;
import com.slack.kaldb.metadata.schema.LuceneFieldDef;
import com.slack.kaldb.proto.config.KaldbConfigs;
Expand Down Expand Up @@ -178,6 +179,7 @@ private IndexWriterConfig buildIndexWriterConfig(
.setMergeScheduler(new KalDBMergeScheduler(metricsRegistry))
.setRAMBufferSizeMB(ramBufferSizeMb)
.setUseCompoundFile(useCFSFiles)
.setCodec(new ZstdCodec())
// we sort by timestamp descending, as that is the order we expect to return results the
// majority of the time
.setIndexSort(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.slack.kaldb.logstore.index.codec;

import java.io.IOException;
import org.apache.lucene.codecs.FilterCodec;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat;
import org.apache.lucene.codecs.lucene95.Lucene95Codec;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

// Inspired by https://github.com/opensearch-project/custom-codecs/
// We can reuse that and remove this once custom-codecs has a release
// Update this when updating lucene versions
public final class ZstdCodec extends FilterCodec {

private final ZstdStoredFieldsFormat storedFieldsFormat;

public ZstdCodec() {
super("CustomCodec", new Lucene95Codec());
this.storedFieldsFormat = new ZstdStoredFieldsFormat();
}

public StoredFieldsFormat storedFieldsFormat() {
return storedFieldsFormat;
}

@Override
public String toString() {
return getClass().getSimpleName();
}

class ZstdStoredFieldsFormat extends StoredFieldsFormat {

private static final int ZSTD_BLOCK_LENGTH = 10 * 48 * 1024;
private static final int ZSTD_MAX_DOCS_PER_BLOCK = 4096;
private static final int ZSTD_BLOCK_SHIFT = 10;
public static final int DEFAULT_COMPRESSION_LEVEL = 3;

private final CompressionMode zstdCompressionMode;

private ZstdStoredFieldsFormat() {
zstdCompressionMode = new ZstdCompressionMode(DEFAULT_COMPRESSION_LEVEL);
}

@Override
public StoredFieldsReader fieldsReader(
Directory directory, SegmentInfo si, FieldInfos fn, IOContext context) throws IOException {
Lucene90CompressingStoredFieldsFormat impl =
new Lucene90CompressingStoredFieldsFormat(
"CustomStoredFieldsZstd",
zstdCompressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT);
return impl.fieldsReader(directory, si, fn, context);
}

@Override
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo si, IOContext context)
throws IOException {
Lucene90CompressingStoredFieldsFormat impl =
new Lucene90CompressingStoredFieldsFormat(
"CustomStoredFieldsZstd",
zstdCompressionMode,
ZSTD_BLOCK_LENGTH,
ZSTD_MAX_DOCS_PER_BLOCK,
ZSTD_BLOCK_SHIFT);
return impl.fieldsWriter(directory, si, context);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
package com.slack.kaldb.logstore.index.codec;

import com.github.luben.zstd.Zstd;
import com.github.luben.zstd.ZstdCompressCtx;
import com.github.luben.zstd.ZstdDecompressCtx;
import com.github.luben.zstd.ZstdDictCompress;
import com.github.luben.zstd.ZstdDictDecompress;
import java.io.IOException;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;

/** Zstandard Compression Mode */
public class ZstdCompressionMode extends CompressionMode {

private static final int NUM_SUB_BLOCKS = 10;
private static final int DICT_SIZE_FACTOR = 6;
private static final int DEFAULT_COMPRESSION_LEVEL = 6;

private final int compressionLevel;

/** default constructor */
protected ZstdCompressionMode() {
this.compressionLevel = DEFAULT_COMPRESSION_LEVEL;
}

/**
* Creates a new instance.
*
* @param compressionLevel The compression level to use.
*/
protected ZstdCompressionMode(int compressionLevel) {
this.compressionLevel = compressionLevel;
}

/** Creates a new compressor instance. */
@Override
public Compressor newCompressor() {
return new ZstdCompressor(compressionLevel);
}

/** Creates a new decompressor instance. */
@Override
public Decompressor newDecompressor() {
return new ZstdDecompressor();
}

/** zstandard compressor */
private static final class ZstdCompressor extends Compressor {

private final int compressionLevel;
private byte[] compressedBuffer;

/** compressor with a given compresion level */
public ZstdCompressor(int compressionLevel) {
this.compressionLevel = compressionLevel;
compressedBuffer = BytesRef.EMPTY_BYTES;
}

/*resuable compress function*/
private void doCompress(
byte[] bytes, int offset, int length, ZstdCompressCtx cctx, DataOutput out)
throws IOException {
if (length == 0) {
out.writeVInt(0);
return;
}
final int maxCompressedLength = (int) Zstd.compressBound(length);
compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, maxCompressedLength);

int compressedSize =
cctx.compressByteArray(
compressedBuffer, 0, compressedBuffer.length, bytes, offset, length);

out.writeVInt(compressedSize);
out.writeBytes(compressedBuffer, compressedSize);
}

private void compress(byte[] bytes, int offset, int length, DataOutput out) throws IOException {
assert offset >= 0 : "offset value must be greater than 0";

final int dictLength = length / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR);
final int blockLength = (length - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS;
out.writeVInt(dictLength);
out.writeVInt(blockLength);

final int end = offset + length;
assert end >= 0 : "buffer read size must be greater than 0";

try (ZstdCompressCtx cctx = new ZstdCompressCtx()) {
cctx.setLevel(compressionLevel);

// dictionary compression first
doCompress(bytes, offset, dictLength, cctx, out);
try (ZstdDictCompress dictCompress =
new ZstdDictCompress(bytes, offset, dictLength, compressionLevel)) {
cctx.loadDict(dictCompress);

for (int start = offset + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, end - start);
doCompress(bytes, start, l, cctx, out);
}
}
}
}

@Override
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
final int length = (int) buffersInput.size();
byte[] bytes = new byte[length];
buffersInput.readBytes(bytes, 0, length);
compress(bytes, 0, length, out);
}

@Override
public void close() throws IOException {}
}

/** zstandard decompressor */
private static final class ZstdDecompressor extends Decompressor {

private byte[] compressedBuffer;

/** default decompressor */
public ZstdDecompressor() {
compressedBuffer = BytesRef.EMPTY_BYTES;
}

/*resuable decompress function*/
private void doDecompress(
DataInput in, ZstdDecompressCtx dctx, BytesRef bytes, int decompressedLen)
throws IOException {
final int compressedLength = in.readVInt();
if (compressedLength == 0) {
return;
}

compressedBuffer = ArrayUtil.growNoCopy(compressedBuffer, compressedLength);
in.readBytes(compressedBuffer, 0, compressedLength);

bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + decompressedLen);
int uncompressed =
dctx.decompressByteArray(
bytes.bytes, bytes.length, decompressedLen, compressedBuffer, 0, compressedLength);

if (decompressedLen != uncompressed) {
throw new IllegalStateException(decompressedLen + " " + uncompressed);
}
bytes.length += uncompressed;
}

@Override
public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes)
throws IOException {
assert offset + length <= originalLength : "buffer read size must be within limit";

if (length == 0) {
bytes.length = 0;
return;
}
final int dictLength = in.readVInt();
final int blockLength = in.readVInt();
bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, dictLength);
bytes.offset = bytes.length = 0;

try (ZstdDecompressCtx dctx = new ZstdDecompressCtx()) {

// decompress dictionary first
doDecompress(in, dctx, bytes, dictLength);
try (ZstdDictDecompress dictDecompress =
new ZstdDictDecompress(bytes.bytes, 0, dictLength)) {
dctx.loadDict(dictDecompress);

int offsetInBlock = dictLength;
int offsetInBytesRef = offset;

// Skip unneeded blocks
while (offsetInBlock + blockLength < offset) {
final int compressedLength = in.readVInt();
in.skipBytes(compressedLength);
offsetInBlock += blockLength;
offsetInBytesRef -= blockLength;
}

// Read blocks that intersect with the interval we need
while (offsetInBlock < offset + length) {
bytes.bytes = ArrayUtil.grow(bytes.bytes, bytes.length + blockLength);
int l = Math.min(blockLength, originalLength - offsetInBlock);
doDecompress(in, dctx, bytes, l);
offsetInBlock += blockLength;
}

bytes.offset = offsetInBytesRef;
bytes.length = length;

assert bytes.isValid() : "decompression output is corrupted";
}
}
}

@Override
public Decompressor clone() {
return new ZstdDecompressor();
}
}
}