Skip to content

Commit

Permalink
Create serde utility for Writable classes
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <[email protected]>
  • Loading branch information
soosinha committed Jun 8, 2024
1 parent fbe048f commit 7388ee9
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.blobstore;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.opensearch.Version;
import org.opensearch.common.CheckedFunction;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.common.lucene.store.IndexOutputOutputStream;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.common.io.stream.OutputStreamStreamOutput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.compress.Compressor;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.core.compress.NotXContentException;
import org.opensearch.gateway.CorruptStateException;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

/**
* Checksum File format used to serialize/deserialize {@link Writeable} objects
*
* @opensearch.internal
*/
public class ChecksumWritableBlobStoreFormat<T extends Writeable> {

public static final int VERSION = 1;

private static final int BUFFER_SIZE = 4096;

private final String codec;
private final CheckedFunction<StreamInput, T, IOException> reader;

public ChecksumWritableBlobStoreFormat(String codec, CheckedFunction<StreamInput, T, IOException> reader) {
this.codec = codec;
this.reader = reader;
}

public BytesReference serialize(final T obj, final String blobName, final Compressor compressor) throws IOException {
try (BytesStreamOutput outputStream = new BytesStreamOutput()) {
try (
OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(
"ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")",
blobName,
outputStream,
BUFFER_SIZE
)
) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);

try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
}
}; StreamOutput stream = new OutputStreamStreamOutput(compressor.threadLocalOutputStream(indexOutputOutputStream));) {
// TODO The stream version should be configurable
stream.setVersion(Version.CURRENT);
obj.writeTo(stream);
}
CodecUtil.writeFooter(indexOutput);
}
return outputStream.bytes();
}
}

public T deserialize(String blobName, CheckedFunction<StreamInput, T, IOException> reader, BytesReference bytes) throws IOException {
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
try {
final IndexInput indexInput = bytes.length() > 0
? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
BytesReference bytesReference = bytes.slice((int) filePointer, (int) contentSize);
Compressor compressor;
try {
compressor = CompressorRegistry.compressor(bytesReference);
} catch (NotXContentException e) {
compressor = CompressorRegistry.none();
}
try (StreamInput in = new InputStreamStreamInput(compressor.threadLocalInputStream(bytesReference.streamInput()))) {
return reader.apply(in);
}
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.blobstore;

import org.opensearch.Version;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.compress.CompressorRegistry;
import org.opensearch.core.index.Index;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;

import static org.hamcrest.Matchers.is;

/**
* Tests for {@link ChecksumWritableBlobStoreFormat}
*/
public class ChecksumWritableBlobStoreFormatTests extends OpenSearchTestCase {
private static final String TEST_BLOB_FILE_NAME = "test-blob-name";
private static final long VERSION = 5L;

private final ChecksumWritableBlobStoreFormat<IndexMetadata> clusterBlocksFormat = new ChecksumWritableBlobStoreFormat<>(
"index-metadata",
IndexMetadata::readFrom
);

public void testSerDe() throws IOException {
IndexMetadata indexMetadata = getIndexMetadata();
BytesReference bytesReference = clusterBlocksFormat.serialize(indexMetadata, TEST_BLOB_FILE_NAME, CompressorRegistry.none());
IndexMetadata readIndexMetadata = clusterBlocksFormat.deserialize(TEST_BLOB_FILE_NAME, IndexMetadata::readFrom, bytesReference);
assertThat(readIndexMetadata, is(indexMetadata));
}

private IndexMetadata getIndexMetadata() {
final Index index = new Index("test-index", "index-uuid");
final Settings idxSettings = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_INDEX_UUID, index.getUUID())
.build();
return new IndexMetadata.Builder(index.getName()).settings(idxSettings)
.version(VERSION)
.numberOfShards(1)
.numberOfReplicas(0)
.build();
}
}

0 comments on commit 7388ee9

Please sign in to comment.