Skip to content

Commit

Permalink
Changes to introduce input stream
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <[email protected]>
  • Loading branch information
Bukhtawar committed Sep 20, 2022
1 parent 593c8d1 commit 0fcdc6e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,10 @@ public long getMinTranslogGeneration() {
return minTranslogGeneration;
}

public long getGeneration() {
return generation;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;

/**
Expand Down Expand Up @@ -47,14 +47,9 @@ public void uploadBlobAsync(
assert remoteTransferPath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remoteTransferPath;
executorService.execute(ActionRunnable.wrap(listener, l -> {
try {
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath)
.writeBlobAtomic(
fileSnapshot.getName(),
new ByteArrayInputStream(fileSnapshot.getContent()),
fileSnapshot.getContentLength(),
true
);
.writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
l.onResponse(fileSnapshot);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), e);
Expand All @@ -67,14 +62,8 @@ public void uploadBlobAsync(
public void uploadBlob(final TransferFileSnapshot fileSnapshot, Iterable<String> remoteTransferPath) throws IOException {
assert remoteTransferPath instanceof BlobPath;
BlobPath blobPath = (BlobPath) remoteTransferPath;
try {
blobStore.blobContainer(blobPath)
.writeBlobAtomic(
fileSnapshot.getName(),
new ByteArrayInputStream(fileSnapshot.getContent()),
fileSnapshot.getContentLength(),
true
);
try (InputStream inputStream = fileSnapshot.inputStream()) {
blobStore.blobContainer(blobPath).writeBlobAtomic(fileSnapshot.getName(), inputStream, fileSnapshot.getContentLength(), true);
} catch (Exception ex) {
logger.error(() -> new ParameterizedMessage("Failed to upload blob {}", fileSnapshot.getName()), ex);
throw ex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,49 +9,50 @@
package org.opensearch.index.translog.transfer;

import org.opensearch.common.Nullable;
import org.opensearch.common.io.stream.ByteBufferStreamInput;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.InputStreamStreamInput;
import org.opensearch.core.internal.io.IOUtils;
import org.opensearch.index.translog.BufferedChecksumStreamInput;

import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Objects;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;

/**
* Snapshot of a single file that gets transferred
*
* @opensearch.internal
*/
public class FileSnapshot {
public class FileSnapshot implements Closeable {

private final long checksum;
private final byte[] content;
private final String name;
private final long contentLength;
@Nullable
private final FileChannel fileChannel;
@Nullable
private Path path;
@Nullable
private byte[] content;

public FileSnapshot(Path path) throws IOException {
Objects.requireNonNull(path);
this.name = path.getFileName().toString();
this.name = path.toString();
this.path = path;
try (CheckedInputStream stream = new CheckedInputStream(Files.newInputStream(path), new CRC32())) {
this.content = stream.readAllBytes();
this.checksum = stream.getChecksum().getValue();
this.contentLength = content.length;
}
this.fileChannel = FileChannel.open(path, StandardOpenOption.READ);
}

public FileSnapshot(String name, byte[] content) throws IOException {
Objects.requireNonNull(content);
Objects.requireNonNull(name);
this.name = name;
try (CheckedInputStream stream = new CheckedInputStream(new ByteArrayInputStream(content), new CRC32())) {
this.content = stream.readAllBytes();
this.checksum = stream.getChecksum().getValue();
this.contentLength = content.length;
}
this.content = content;
this.fileChannel = null;
}

public Path getPath() {
Expand All @@ -62,21 +63,20 @@ public String getName() {
return name;
}

public byte[] getContent() {
return content;
public long getContentLength() throws IOException {
return fileChannel == null ? fileChannel.size() : content.length ;
}

public long getChecksum() {
return checksum;
}

public long getContentLength() {
return contentLength;
public InputStream inputStream() throws IOException {
return fileChannel != null ? new BufferedChecksumStreamInput(
new InputStreamStreamInput(Channels.newInputStream(fileChannel), fileChannel.size()),
path.toString())
: new BufferedChecksumStreamInput(new BytesStreamInput(content), name);
}

@Override
public int hashCode() {
return Objects.hash(name, path, checksum, contentLength);
return Objects.hash(name, content, path);
}

@Override
Expand All @@ -85,9 +85,8 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
FileSnapshot other = (FileSnapshot) o;
return Objects.equals(this.name, other.name)
&& Objects.equals(this.path, other.path)
&& Objects.equals(this.checksum, other.checksum)
&& Objects.equals(this.contentLength, other.contentLength);
&& Objects.equals(this.content, other.content)
&& Objects.equals(this.path, other.path);
}

@Override
Expand All @@ -96,14 +95,16 @@ public String toString() {
.append(name)
.append(", path = ")
.append(path.toUri())
.append(", checksum = ")
.append(checksum)
.append(", contentLength = ")
.append(contentLength)
.append("]")
.toString();
}

@Override
public void close() throws IOException {
IOUtils.close(fileChannel);
}


public static class TransferFileSnapshot extends FileSnapshot {

private final long primaryTerm;
Expand Down Expand Up @@ -171,11 +172,18 @@ public boolean equals(Object o) {

public static class CheckpointFileSnapshot extends TransferFileSnapshot {

private final long generation;

private final long minTranslogGeneration;

public CheckpointFileSnapshot(long primaryTerm, long minTranslogGeneration, Path path) throws IOException {
public CheckpointFileSnapshot(long primaryTerm, long generation, long minTranslogGeneration, Path path) throws IOException {
super(path, primaryTerm);
this.minTranslogGeneration = minTranslogGeneration;
this.generation = generation;
}

public long getGeneration() {
return generation;
}

public long getMinTranslogGeneration() {
Expand All @@ -184,7 +192,7 @@ public long getMinTranslogGeneration() {

@Override
public int hashCode() {
return Objects.hash(minTranslogGeneration, super.hashCode());
return Objects.hash(generation, minTranslogGeneration, super.hashCode());
}

@Override
Expand All @@ -193,7 +201,8 @@ public boolean equals(Object o) {
if (this == o) return true;
if (getClass() != o.getClass()) return false;
CheckpointFileSnapshot other = (CheckpointFileSnapshot) o;
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration);
return Objects.equals(this.minTranslogGeneration, other.minTranslogGeneration)
&& Objects.equals(this.generation, other.generation);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot;
import static org.opensearch.index.translog.transfer.FileSnapshot.TranslogFileSnapshot;
Expand All @@ -45,11 +47,12 @@ public TransferSnapshotProvider(
final long readerGeneration = reader.getGeneration();
final long readerPrimaryTerm = reader.getPrimaryTerm();
final long minTranslogGeneration = reader.getCheckpoint().getMinTranslogGeneration();
final long checkpointGeneration = reader.getCheckpoint().getGeneration();
Path translogPath = reader.path();
Path checkpointPath = location.resolve(checkpointGenFileNameMapper.apply(readerGeneration));
translogTransferSnapshot.add(
new TranslogFileSnapshot(readerPrimaryTerm, readerGeneration, translogPath),
new CheckpointFileSnapshot(readerPrimaryTerm, minTranslogGeneration, checkpointPath)
new CheckpointFileSnapshot(readerPrimaryTerm, checkpointGeneration, minTranslogGeneration, checkpointPath)
);
}
}
Expand All @@ -63,32 +66,46 @@ static class TranslogCheckpointTransferSnapshot implements TransferSnapshot {

private final Set<Tuple<TranslogFileSnapshot, CheckpointFileSnapshot>> translogCheckpointFileInfoTupleSet;
private final int size;
private final List<Long> generations;
private CheckpointFileSnapshot latestCheckPointFileSnapshot;
private TranslogFileSnapshot latestTranslogFileSnapshot;
private final long generation;
private long highestGeneration;
private long lowestGeneration;
private final long primaryTerm;

TranslogCheckpointTransferSnapshot(long primaryTerm, long generation, int size) {
translogCheckpointFileInfoTupleSet = new HashSet<>(size);
this.size = size;
this.generations = new LinkedList<>();
this.generation = generation;
this.primaryTerm = primaryTerm;
this.highestGeneration = Long.MIN_VALUE;
this.lowestGeneration = Long.MAX_VALUE;
}

private void add(TranslogFileSnapshot translogFileSnapshot, CheckpointFileSnapshot checkPointFileSnapshot) {
translogCheckpointFileInfoTupleSet.add(Tuple.tuple(translogFileSnapshot, checkPointFileSnapshot));
assert translogFileSnapshot.getGeneration() == checkPointFileSnapshot.getGeneration();
generations.add(translogFileSnapshot.getGeneration());
if (highestGeneration < translogFileSnapshot.getGeneration()) {
latestCheckPointFileSnapshot = checkPointFileSnapshot;
latestTranslogFileSnapshot = translogFileSnapshot;
highestGeneration = translogFileSnapshot.getGeneration();
}
this.lowestGeneration = Math.min(lowestGeneration, translogFileSnapshot.getGeneration());
}

private void assertInvariants() {
assert this.primaryTerm == latestTranslogFileSnapshot.getPrimaryTerm() : "inconsistent primary term";
assert this.generation == highestGeneration : "inconsistent generation";
assert translogCheckpointFileInfoTupleSet.size() == size : "inconsistent translog and checkpoint file count";
assert highestGeneration <= lowestGeneration : "lowest generation is greater than highest generation";
assert LongStream.iterate(lowestGeneration, i -> i + 1)
.limit(highestGeneration)
.boxed()
.collect(Collectors.toList())
.equals(generations.stream().sorted().collect(Collectors.toList())) == true : "generation gaps found";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.index.translog.transfer.listener.FileTransferListener;
import org.opensearch.index.translog.transfer.listener.TranslogTransferListener;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -132,16 +131,12 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot)
);
TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata();
translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap));
TransferFileSnapshot fileSnapshot;
try (BytesStreamOutput output = new BytesStreamOutput()) {
translogTransferMetadata.writeTo(output);
fileSnapshot = new TransferFileSnapshot(
translogTransferMetadata.getMetadataFileName(),
BytesReference.toBytes(output.bytes()),
-1
);
TransferFileSnapshot fileSnapshot = new TransferFileSnapshot(
translogTransferMetadata.getFileName(),
translogTransferMetadata.createMetadataBytes(),
translogTransferMetadata.getPrimaryTerm()
);

}
return fileSnapshot;
}
}
Loading

0 comments on commit 0fcdc6e

Please sign in to comment.