Skip to content

Commit

Permalink
Minor object store improvement - complete faster (#1237)
Browse files Browse the repository at this point in the history
* Minor object store improvement - complete faster

* validation order

* short circuit
  • Loading branch information
scottf authored Oct 10, 2024
1 parent 30b264c commit 43f9a01
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsFeatureBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class NatsFeatureBase {

protected final NatsJetStream js;
protected final JetStreamManagement jsm;
protected final NatsJetStreamManagement jsm;
protected String streamName;

NatsFeatureBase(NatsConnection connection, FeatureOptions fo) throws IOException {
Expand Down
32 changes: 22 additions & 10 deletions src/main/java/io/nats/client/impl/NatsObjectStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.io.*;
import java.nio.file.Files;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -218,6 +217,7 @@ public ObjectInfo get(String objectName, OutputStream out) throws IOException, J
Digester digester = new Digester();
long totalBytes = 0;
long totalChunks = 0;
long expectedChunks = oi.getChunks();

// if there is one chunk, just go get the message directly and we're done.
if (oi.getChunks() == 1) {
Expand All @@ -236,30 +236,42 @@ public ObjectInfo get(String objectName, OutputStream out) throws IOException, J
JetStreamSubscription sub = js.subscribe(rawChunkSubject(oi.getNuid()),
PushSubscribeOptions.builder().stream(streamName).ordered(true).build());

Message m = sub.nextMessage(Duration.ofSeconds(1));
Message m = sub.nextMessage(jsm.getTimeout());
while (m != null) {
// track the byte count and chunks
long pending = m.metaData().pendingCount();
if (expectedChunks != pending + (++totalChunks)) {
throw OsGetChunksMismatch.instance(); // short circuit, we already know there are not enough chunks.
}

byte[] data = m.getData();
totalBytes += data.length;

// track the byte count and chunks
// update the digest
// write the bytes to the output file
totalBytes += data.length;
totalChunks++;
digester.update(data);

// write the bytes to the output file
out.write(data);

// read until the subject is complete
m = sub.nextMessage(Duration.ofSeconds(1));
if (pending == 0) {
break;
}
m = sub.nextMessage(jsm.getTimeout());
}

sub.unsubscribe();
try {
sub.unsubscribe();
}
catch (RuntimeException ignore) {}
}
out.flush();

if (totalBytes != oi.getSize()) { throw OsGetSizeMismatch.instance(); }
if (totalChunks != oi.getChunks()) { throw OsGetChunksMismatch.instance(); }
if (totalBytes != oi.getSize()) { throw OsGetSizeMismatch.instance(); }
if (!digester.matches(oi.getDigest())) { throw OsGetDigestMismatch.instance(); }

out.flush(); // moved after validation, no need if invalid

return oi;
}

Expand Down

0 comments on commit 43f9a01

Please sign in to comment.