diff --git a/src/main/java/io/nats/client/JetStreamManagement.java b/src/main/java/io/nats/client/JetStreamManagement.java index c1058da4d..499c8ef4d 100644 --- a/src/main/java/io/nats/client/JetStreamManagement.java +++ b/src/main/java/io/nats/client/JetStreamManagement.java @@ -282,6 +282,35 @@ public interface JetStreamManagement { */ MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException; + /** + * Get MessageInfo for the first message created at or after the start time. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream. + * @param startTime the start time to get the first message for. + * @return The MessageInfo + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException; + + /** + * Get MessageInfo for the first message created at or after the start time matching the subject. + *

+ * This API is currently EXPERIMENTAL and is subject to change. + * + * @param streamName the name of the stream. + * @param startTime the start time to get the first message for. + * @param subject the subject to get the first message for. + * @return The MessageInfo + * @throws IOException covers various communication issues with the NATS + * server such as timeout or interruption + * @throws JetStreamApiException the request had an error related to the data + */ + MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException; + /** * Get MessageInfo for the message of the message sequence * is equal to or greater the requested sequence for the subject. diff --git a/src/main/java/io/nats/client/api/MessageGetRequest.java b/src/main/java/io/nats/client/api/MessageGetRequest.java index 60d9d6851..da608587e 100644 --- a/src/main/java/io/nats/client/api/MessageGetRequest.java +++ b/src/main/java/io/nats/client/api/MessageGetRequest.java @@ -15,6 +15,8 @@ import io.nats.client.support.JsonSerializable; +import java.time.ZonedDateTime; + import static io.nats.client.support.ApiConstants.*; import static io.nats.client.support.JsonUtils.*; @@ -25,21 +27,30 @@ public class MessageGetRequest implements JsonSerializable { private final long sequence; private final String lastBySubject; private final String nextBySubject; + private final ZonedDateTime startTime; public static MessageGetRequest forSequence(long sequence) { - return new MessageGetRequest(sequence, null, null); + return new MessageGetRequest(sequence, null, null, null); } public static MessageGetRequest lastForSubject(String subject) { - return new MessageGetRequest(-1, subject, null); + return new MessageGetRequest(-1, subject, null, null); } public static MessageGetRequest firstForSubject(String subject) { - return new MessageGetRequest(-1, null, subject); + return new MessageGetRequest(-1, null, subject, null); + } + + public static MessageGetRequest firstForStartTime(ZonedDateTime startTime) { + return new MessageGetRequest(-1, null, null, startTime); + } + + public static MessageGetRequest firstForStartTimeAndSubject(ZonedDateTime startTime, String subject) { + return new MessageGetRequest(-1, null, subject, startTime); } public static MessageGetRequest nextForSubject(long sequence, String subject) { - return new MessageGetRequest(sequence, null, subject); + return new MessageGetRequest(sequence, null, subject, null); } /** @@ -69,7 +80,7 @@ public static byte[] lastBySubjectBytes(String subject) { */ @Deprecated public MessageGetRequest(long sequence) { - this(sequence, null, null); + this(sequence, null, null, null); } /** @@ -79,13 +90,14 @@ public MessageGetRequest(long sequence) { */ @Deprecated public MessageGetRequest(String lastBySubject) { - this(-1, lastBySubject, null); + this(-1, lastBySubject, null, null); } - private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject) { + private MessageGetRequest(long sequence, String lastBySubject, String nextBySubject, ZonedDateTime startTime) { this.sequence = sequence; this.lastBySubject = lastBySubject; this.nextBySubject = nextBySubject; + this.startTime = startTime; } public long getSequence() { @@ -118,6 +130,7 @@ public String toJson() { addField(sb, SEQ, sequence); addField(sb, LAST_BY_SUBJECT, lastBySubject); addField(sb, NEXT_BY_SUBJECT, nextBySubject); + addField(sb, START_TIME, startTime); return endJson(sb).toString(); } } diff --git a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java index 0cd15fb86..b6fe2c7cd 100644 --- a/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java +++ b/src/main/java/io/nats/client/impl/NatsJetStreamManagement.java @@ -289,6 +289,22 @@ public MessageInfo getFirstMessage(String streamName, String subject) throws IOE return _getMessage(streamName, MessageGetRequest.firstForSubject(subject)); } + /** + * {@inheritDoc} + */ + @Override + public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException { + return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime)); + } + + /** + * {@inheritDoc} + */ + @Override + public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException { + return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject)); + } + /** * {@inheritDoc} */ diff --git a/src/main/java/io/nats/client/support/ApiConstants.java b/src/main/java/io/nats/client/support/ApiConstants.java index 3d6025c1f..16ff26f17 100644 --- a/src/main/java/io/nats/client/support/ApiConstants.java +++ b/src/main/java/io/nats/client/support/ApiConstants.java @@ -176,6 +176,7 @@ public interface ApiConstants { String SOURCES = "sources"; String SRC = "src"; String STARTED = "started"; + String START_TIME = "start_time"; String STATE = "state"; String STATS = "stats"; String STORAGE = "storage"; diff --git a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java index a8413e12b..47f25d989 100644 --- a/src/test/java/io/nats/client/impl/JetStreamManagementTests.java +++ b/src/test/java/io/nats/client/impl/JetStreamManagementTests.java @@ -1293,6 +1293,8 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated); assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated); assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated); + assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, beforeCreated), beforeCreated); + assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, beforeCreated, tsc.subject(1)), beforeCreated); assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated); assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated); @@ -1305,6 +1307,7 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1))); assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0))); + assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, DEFAULT_TIME))); assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9))); assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject"))); assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject")));