Skip to content

Commit

Permalink
Create serde utility for Writable classes (opensearch-project#14095) (o…
Browse files Browse the repository at this point in the history
…pensearch-project#14133)

* Create serde utility for Writable classes

(cherry picked from commit 7f0ff14)

Signed-off-by: Sooraj Sinha <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: kkewwei <[email protected]>
  • Loading branch information
2 people authored and kkewwei committed Jul 24, 2024
1 parent d4ed2e8 commit 632c671
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ public static Compressor compressor(final BytesReference bytes) {
return null;
}

/**
* @param bytes The bytes to check the compression for
* @return The detected compressor. If no compressor detected then return NoneCompressor.
*/
public static Compressor compressorForWritable(final BytesReference bytes) {
for (Compressor compressor : registeredCompressors.values()) {
if (compressor.isCompressed(bytes) == true) {
return compressor;
}
}
return CompressorRegistry.none();
}

/** Decompress the provided {@link BytesReference}. */
public static BytesReference uncompress(BytesReference bytes) throws IOException {
Compressor compressor = compressor(bytes);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.blobstore;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.gateway.CorruptStateException;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

/**
* Checksum File format used to serialize/deserialize {@link Writeable} objects
*
* @opensearch.internal
*/
public class ChecksumWritableBlobStoreFormat<T extends Writeable> {

public static final int VERSION = 1;

private static final int BUFFER_SIZE = 4096;

private final String codec;
private final CheckedFunction<StreamInput, T, IOException> reader;

public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput, T, IOException> reader) {
this.codec = codec;
this.reader = reader;
}

public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")",
blobName,
outputStream,
BUFFER_SIZE
)
) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);

try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
}
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
// TODO The stream version should be configurable
stream.setVersion(Version.CURRENT);
obj.writeTo(stream);
}
CodecUtil.writeFooter(indexOutput);
}
return outputStream.bytes();
}
}

public T deserialize(String blobName, BytesReference bytes) throws IOException {
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try {
final IndexInput indexInput = bytes.length() > 0
? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize);
Compressor compressor = CompressorRegistry.compressorForWritable(bytesReference);
try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) {
return reader.apply(in);
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.blobstore;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.compress.DeflateCompressor;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.is;

/**
* Tests for {@link ChecksumWritableBlobStoreFormat}
*/
public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {
private static final String TEST_BLOB_FILE_NAME = "test-blob-name";
private static final long VERSION = 5L;

private final ChecksumWritableBlobStoreFormat<IndexMetadata> clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>(
"index-metadata",
IndexMetadata::readFrom
);

public void testSerDe() throws IOException {
IndexMetadata indexMetadata = getIndexMetadata();
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference);
assertThat(readIndexMetadata, is(indexMetadata));
}

public void testSerDeForCompressed() throws IOException {
IndexMetadata indexMetadata = getIndexMetadata();
BytesReference bytesReference = clusterBlocksFormat.serialize(
indexMetadata,
TEST_BLOB_FILE_NAME,
CompressorRegistry.getCompressor(DeflateCompressor.NAME)
);
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, bytesReference);
assertThat(readIndexMetadata, is(indexMetadata));
}

private IndexMetadata getIndexMetadata() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
.build();
return new IndexMetadata.Builder(index.getName()).settings(idxSettings)
.version(VERSION)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
}
}

0 comments on commit 632c671

Please sign in to comment.