Skip to content

Commit

Permalink
getFirstMessage is a 2.11 feature (#1246)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Nov 1, 2024
1 parent 1ceea55 commit 10db91d
Show file tree
Hide file tree
Showing 8 changed files with 6 additions and 103 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ public class BaseConsumeOptions implements JsonSerializable {
protected BaseConsumeOptions(Builder b) {
bytes = b.bytes;
if (bytes > 0) {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT_WHEN_BYTES : b.messages;
}
else {
messages = b.messages == -1 ? DEFAULT_MESSAGE_COUNT : b.messages;
messages = b.messages < 0 ? DEFAULT_MESSAGE_COUNT : b.messages;
}

// validation handled in builder
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/nats/client/ConnectionListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
* listener is configured in the {@link Options Options} at creation time.
*/
public interface ConnectionListener {
public enum Events {
enum Events {
/** The connection has successfully completed the handshake with the nats-server. */
CONNECTED(true, "opened"),
/** The connection is permanently closed, either by manual action or failed reconnects. */
Expand All @@ -29,7 +29,7 @@ public enum Events {
RECONNECTED(true, "reconnected"),
/** The connection was reconnected and the server has been notified of all subscriptions. */
RESUBSCRIBED(false, "subscriptions re-established"),
/** The connection was told about new servers from, from the current server. */
/** The connection was made aware of new servers from the current server connection. */
DISCOVERED_SERVERS(false, "discovered servers"),
/** Server Sent a lame duck mode. */
LAME_DUCK(false, "lame duck mode");
Expand Down Expand Up @@ -77,5 +77,5 @@ public String toString() {
* @param conn the connection associated with the error
* @param type the type of event that has occurred
*/
public void connectionEvent(Connection conn, Events type);
}
void connectionEvent(Connection conn, Events type);
}
40 changes: 0 additions & 40 deletions src/main/java/io/nats/client/JetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,46 +271,6 @@ public interface JetStreamManagement {
*/
MessageInfo getLastMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message of the subject.
* @param streamName the name of the stream.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message created at or after the start time matching the subject.
* <p>
* This API is currently EXPERIMENTAL and is subject to change.
*
* @param streamName the name of the stream.
* @param startTime the start time to get the first message for.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/io/nats/client/StreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,16 +163,6 @@ public interface StreamContext {
*/
MessageInfo getLastMessage(String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the first message of the subject.
* @param subject the subject to get the first message for.
* @return The MessageInfo
* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
MessageInfo getFirstMessage(String subject) throws IOException, JetStreamApiException;

/**
* Get MessageInfo for the message of the message sequence
* is equal to or greater the requested sequence for the subject.
Expand Down
24 changes: 0 additions & 24 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,30 +281,6 @@ public MessageInfo getLastMessage(String streamName, String subject) throws IOEx
return _getMessage(streamName, MessageGetRequest.lastForSubject(subject));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, String subject) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForSubject(subject));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTime(startTime));
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String streamName, ZonedDateTime startTime, String subject) throws IOException, JetStreamApiException {
return _getMessage(streamName, MessageGetRequest.firstForStartTimeAndSubject(startTime, subject));
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 0 additions & 8 deletions src/main/java/io/nats/client/impl/NatsStreamContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,6 @@ public MessageInfo getLastMessage(String subject) throws IOException, JetStreamA
return jsm.getLastMessage(streamName, subject);
}

/**
* {@inheritDoc}
*/
@Override
public MessageInfo getFirstMessage(String subject) throws IOException, JetStreamApiException {
return jsm.getFirstMessage(streamName, subject);
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,10 +1291,6 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, -1, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 0, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 0, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, tsc.subject(1)), beforeCreated);
assertMessageInfo(tsc, 0, 1, jsm.getFirstMessage(tsc.stream, beforeCreated), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getFirstMessage(tsc.stream, beforeCreated, tsc.subject(1)), beforeCreated);

assertMessageInfo(tsc, 0, 1, jsm.getNextMessage(tsc.stream, 1, tsc.subject(0)), beforeCreated);
assertMessageInfo(tsc, 1, 2, jsm.getNextMessage(tsc.stream, 1, tsc.subject(1)), beforeCreated);
Expand All @@ -1307,10 +1303,8 @@ private void validateGetMessage(JetStreamManagement jsm, TestingStreamContainer

assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, -1)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 0)));
assertStatus(10003, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, DEFAULT_TIME)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getMessage(tsc.stream, 9)));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getLastMessage(tsc.stream, "not-a-subject")));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getFirstMessage(tsc.stream, "not-a-subject")));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 9, tsc.subject(0))));
assertStatus(10037, assertThrows(JetStreamApiException.class, () -> jsm.getNextMessage(tsc.stream, 1, "not-a-subject")));
}
Expand Down
9 changes: 0 additions & 9 deletions src/test/java/io/nats/client/impl/SimplificationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,6 @@ private static void _testStreamContext(JetStream js, TestingStreamContainer tsc,
MessageInfo mi = streamContext.getMessage(1);
assertEquals(1, mi.getSeq());

mi = streamContext.getFirstMessage(tsc.subject());
assertEquals(1, mi.getSeq());

mi = streamContext.getLastMessage(tsc.subject());
assertEquals(6, mi.getSeq());

Expand All @@ -123,12 +120,6 @@ private static void _testStreamContext(JetStream js, TestingStreamContainer tsc,

streamContext.purge(PurgeOptions.builder().sequence(5).build());
assertThrows(JetStreamApiException.class, () -> streamContext.getMessage(1));

mi = streamContext.getFirstMessage(tsc.subject());
assertEquals(5, mi.getSeq());

streamContext.purge();
assertThrows(JetStreamApiException.class, () -> streamContext.getFirstMessage(tsc.subject()));
}

static int FETCH_EPHEMERAL = 1;
Expand Down

0 comments on commit 10db91d

Please sign in to comment.