Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Direct Get by start time #1226

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,35 @@ public interface JetStreamManagement {
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there ever a case when we will want first message by subject and start time? It's probably fine to have this since it seems like a common behavior, but I think it's time to expose something like

MessageInfo getMessage(String streamName, MessageGetRequest messageGetRequest) 

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, in combination with the new builder.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some discussion offline removed the builder again, as the only combinations that needed to be added were:

  • getFirstMessage(streamName, startTime)
  • getFirstMessage(streamName, startTime, subject)

And then we don't need extra validation in the builder for valid combinations.


/**
* Get MessageInfo for the first message created at or after the start time matching the subject.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
27 changes: 20 additions & 7 deletions src/main/java/io/nats/client/api/MessageGetRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import io.nats.client.support.JsonSerializable;

import java.time.ZonedDateTime;

import static io.nats.client.support.ApiConstants.*;
import static io.nats.client.support.JsonUtils.*;

Expand All @@ -25,21 +27,30 @@ public class MessageGetRequest implements JsonSerializable {
private final long sequence;
private final String lastBySubject;
private final String nextBySubject;
private final ZonedDateTime startTime;

public static MessageGetRequest forSequence(long sequence) {
return new MessageGetRequest(sequence, null, null);
return new MessageGetRequest(sequence, null, null, null);
}

public static MessageGetRequest lastForSubject(String subject) {
return new MessageGetRequest(-1, subject, null);
return new MessageGetRequest(-1, subject, null, null);
}

public static MessageGetRequest firstForSubject(String subject) {
return new MessageGetRequest(-1, null, subject);
return new MessageGetRequest(-1, null, subject, null);
}

public static MessageGetRequest firstForStartTime(ZonedDateTime startTime) {
return new MessageGetRequest(-1, null, null, startTime);
}

public static MessageGetRequest firstForStartTimeAndSubject(ZonedDateTime startTime, String subject) {
return new MessageGetRequest(-1, null, subject, startTime);
}

public static MessageGetRequest nextForSubject(long sequence, String subject) {
return new MessageGetRequest(sequence, null, subject);
return new MessageGetRequest(sequence, null, subject, null);
}

/**
Expand Down Expand Up @@ -69,7 +80,7 @@ public static byte[] lastBySubjectBytes(String subject) {
*/
@Deprecated
public MessageGetRequest(long sequence) {
this(sequence, null, null);
this(sequence, null, null, null);
}

/**
Expand All @@ -79,13 +90,14 @@ public MessageGetRequest(long sequence) {
*/
@Deprecated
public MessageGetRequest(String lastBySubject) {
this(-1, lastBySubject, null);
this(-1, lastBySubject, null, null);
}

private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject) {
private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject, ZonedDateTime startTime) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should add a builder to this class, especially if we are going to add a generic get message api

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the builder 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.sequence = sequence;
this.lastBySubject = lastBySubject;
this.nextBySubject = nextBySubject;
this.startTime = startTime;
}

public long getSequence() {
Expand Down Expand Up @@ -118,6 +130,7 @@ public String toJson() {
addField(sb, SEQ, sequence);
addField(sb, LAST_BY_SUBJECT, lastBySubject);
addField(sb, NEXT_BY_SUBJECT, nextBySubject);
addField(sb, START_TIME, startTime);
return endJson(sb).toString();
}
}
16 changes: 16 additions & 0 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ public MessageInfo getFirstMessage(String streamName, String subject) throws IOE
return _getMessage(streamName, MessageGetRequest.firstForSubject(subject));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject));
}

/**
* {@inheritDoc}
*/
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/support/ApiConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public interface ApiConstants {
String SOURCES = "sources";
String SRC = "src";
String STARTED = "started";
String START_TIME = "start_time";
String STATE = "state";
String STATS = "stats";
String STORAGE = "storage";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1293,6 +1293,8 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, beforeCreated), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, beforeCreated, tsc.subject(1)), beforeCreated);

assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated);
Expand All @@ -1305,6 +1307,7 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer

assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, DEFAULT_TIME)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is it correct that this fails because the start time is before the very first message in the stream?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't fail because of the start time being before the first message.
Instead it fails because a DEFAULT_TIME doesn't get serialized into the final JSON, resulting in an empty JSON object being sent which is checked here should be an error.

assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject")));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject")));
Expand Down
Loading