Skip to content

Commit

Permalink
Easier access to timeout in JetStream implementations (#1236)
Browse files Browse the repository at this point in the history
  • Loading branch information
scottf authored Oct 8, 2024
1 parent 27efeab commit d263667
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/NatsFeatureBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ protected void visitSubject(List<String> subjects, DeliverPolicy deliverPolicy,
.configuration(ccb.build())
.build();

Duration timeout = js.jso.getRequestTimeout();
Duration timeout = js.getTimeout();
JetStreamSubscription sub = js.subscribe(null, pso);
try {
boolean lastWasNull = false;
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/nats/client/impl/NatsJetStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ private PublishAck publishSyncInternal(String subject, Headers headers, byte[] d
return null;
}

Duration timeout = options == null ? jso.getRequestTimeout() : options.getStreamTimeout();

Message resp = makeInternalRequestResponseRequired(subject, merged, data, timeout, CancelAction.COMPLETE, validateSubjectAndReplyTo);
Message resp = makeInternalRequestResponseRequired(subject, merged, data, getTimeout(), CancelAction.COMPLETE, validateSubjectAndReplyTo);
return processPublishResponse(resp, options);
}

Expand Down
18 changes: 12 additions & 6 deletions src/main/java/io/nats/client/impl/NatsJetStreamImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public CachedStreamInfo(StreamInfo si) {

final NatsConnection conn;
final JetStreamOptions jso;
final Duration timeout;
final boolean consumerCreate290Available;
final boolean multipleSubjectFilter210Available;
final boolean directBatchGet211Available;
Expand All @@ -59,8 +60,8 @@ public CachedStreamInfo(StreamInfo si) {
// Clone the input jsOptions (JetStreamOptions.builder(...) handles null.
// If jsOptions is not supplied or the jsOptions request timeout
// was not set, use the connection options connect timeout.
Duration rt = jsOptions == null || jsOptions.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();
jso = JetStreamOptions.builder(jsOptions).requestTimeout(rt).build();
timeout = jsOptions == null || jsOptions.getRequestTimeout() == null ? conn.getOptions().getConnectionTimeout() : jsOptions.getRequestTimeout();
jso = JetStreamOptions.builder(jsOptions).requestTimeout(timeout).build();

consumerCreate290Available = conn.getInfo().isSameOrNewerThanVersion("2.9.0") && !jso.isOptOut290ConsumerCreate();
multipleSubjectFilter210Available = conn.getInfo().isNewerVersionThan("2.9.99");
Expand All @@ -70,17 +71,22 @@ public CachedStreamInfo(StreamInfo si) {
NatsJetStreamImpl(NatsJetStreamImpl impl) {
conn = impl.conn;
jso = impl.jso;
timeout = impl.timeout;
consumerCreate290Available = impl.consumerCreate290Available;
multipleSubjectFilter210Available = impl.multipleSubjectFilter210Available;
directBatchGet211Available = impl.directBatchGet211Available;
}

Duration getTimeout() {
return timeout;
}

// ----------------------------------------------------------------------------------------------------
// Management that is also needed by regular context
// ----------------------------------------------------------------------------------------------------
ConsumerInfo _getConsumerInfo(String streamName, String consumerName) throws IOException, JetStreamApiException {
String subj = String.format(JSAPI_CONSUMER_INFO, streamName, consumerName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
return new ConsumerInfo(resp).throwOnHasError();
}

Expand Down Expand Up @@ -122,7 +128,7 @@ else if (durable == null) {
}

ConsumerCreateRequest ccr = new ConsumerCreateRequest(streamName, config, action);
Message resp = makeRequestResponseRequired(subj, ccr.serialize(), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, ccr.serialize(), getTimeout());
return new ConsumerInfo(resp).throwOnHasError();
}

Expand All @@ -147,7 +153,7 @@ StreamInfo _getStreamInfo(String streamName, StreamInfoOptions options) throws I
String subj = String.format(JSAPI_STREAM_INFO, streamName);
StreamInfoReader sir = new StreamInfoReader();
while (sir.hasMore()) {
Message resp = makeRequestResponseRequired(subj, sir.nextJson(options), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, sir.nextJson(options), getTimeout());
sir.process(resp);
}
return cacheStreamInfo(streamName, sir.getStreamInfo());
Expand All @@ -170,7 +176,7 @@ List<StreamInfo> cacheStreamInfo(List<StreamInfo> list) {
List<String> _getStreamNames(String subjectFilter) throws IOException, JetStreamApiException {
StreamNamesReader snr = new StreamNamesReader();
while (snr.hasMore()) {
Message resp = makeRequestResponseRequired(JSAPI_STREAM_NAMES, snr.nextJson(subjectFilter), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(JSAPI_STREAM_NAMES, snr.nextJson(subjectFilter), getTimeout());
snr.process(resp);
}
return snr.getStrings();
Expand Down
28 changes: 14 additions & 14 deletions src/main/java/io/nats/client/impl/NatsJetStreamManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public NatsJetStreamManagement(NatsConnection connection, JetStreamOptions jsOpt
*/
@Override
public AccountStatistics getAccountStatistics() throws IOException, JetStreamApiException {
Message resp = makeRequestResponseRequired(JSAPI_ACCOUNT_INFO, null, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(JSAPI_ACCOUNT_INFO, null, getTimeout());
return new AccountStatistics(resp).throwOnHasError();
}

Expand Down Expand Up @@ -69,7 +69,7 @@ private StreamInfo addOrUpdateStream(StreamConfiguration config, String template
}

String subj = String.format(template, streamName);
Message resp = makeRequestResponseRequired(subj, config.toJson().getBytes(StandardCharsets.UTF_8), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, config.toJson().getBytes(StandardCharsets.UTF_8), getTimeout());
return createAndCacheStreamInfoThrowOnError(streamName, resp);
}

Expand All @@ -80,7 +80,7 @@ private StreamInfo addOrUpdateStream(StreamConfiguration config, String template
public boolean deleteStream(String streamName) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
String subj = String.format(JSAPI_STREAM_DELETE, streamName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

Expand Down Expand Up @@ -109,7 +109,7 @@ public StreamInfo getStreamInfo(String streamName, StreamInfoOptions options) th
public PurgeResponse purgeStream(String streamName) throws IOException, JetStreamApiException {
validateNotNull(streamName, "Stream Name");
String subj = String.format(JSAPI_STREAM_PURGE, streamName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
return new PurgeResponse(resp).throwOnHasError();
}

Expand All @@ -122,7 +122,7 @@ public PurgeResponse purgeStream(String streamName, PurgeOptions options) throws
validateNotNull(options, "Purge Options");
String subj = String.format(JSAPI_STREAM_PURGE, streamName);
byte[] body = options.toJson().getBytes(StandardCharsets.UTF_8);
Message resp = makeRequestResponseRequired(subj, body, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, body, getTimeout());
return new PurgeResponse(resp).throwOnHasError();
}

Expand Down Expand Up @@ -164,7 +164,7 @@ public boolean deleteConsumer(String streamName, String consumerName) throws IOE
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_DELETE, streamName, consumerName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

Expand All @@ -177,7 +177,7 @@ public ConsumerPauseResponse pauseConsumer(String streamName, String consumerNam
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
ConsumerPauseRequest pauseRequest = new ConsumerPauseRequest(pauseUntil);
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, pauseRequest.serialize(), getTimeout());
return new ConsumerPauseResponse(resp).throwOnHasError();
}

Expand All @@ -189,7 +189,7 @@ public boolean resumeConsumer(String streamName, String consumerName) throws IOE
validateNotNull(streamName, "Stream Name");
validateNotNull(consumerName, "Consumer Name");
String subj = String.format(JSAPI_CONSUMER_PAUSE, streamName, consumerName);
Message resp = makeRequestResponseRequired(subj, null, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, null, getTimeout());
ConsumerPauseResponse response = new ConsumerPauseResponse(resp).throwOnHasError();
return !response.isPaused();
}
Expand All @@ -216,7 +216,7 @@ private List<String> getConsumerNames(String streamName, String filter) throws I
String subj = String.format(JSAPI_CONSUMER_NAMES, streamName);
ConsumerNamesReader cnr = new ConsumerNamesReader();
while (cnr.hasMore()) {
Message resp = makeRequestResponseRequired(subj, cnr.nextJson(filter), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, cnr.nextJson(filter), getTimeout());
cnr.process(resp);
}
return cnr.getStrings();
Expand All @@ -230,7 +230,7 @@ public List<ConsumerInfo> getConsumers(String streamName) throws IOException, Je
String subj = String.format(JSAPI_CONSUMER_LIST, streamName);
ConsumerListReader clg = new ConsumerListReader();
while (clg.hasMore()) {
Message resp = makeRequestResponseRequired(subj, clg.nextJson(), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, clg.nextJson(), getTimeout());
clg.process(resp);
}
return clg.getConsumers();
Expand Down Expand Up @@ -264,7 +264,7 @@ public List<StreamInfo> getStreams() throws IOException, JetStreamApiException {
public List<StreamInfo> getStreams(String subjectFilter) throws IOException, JetStreamApiException {
StreamListReader slr = new StreamListReader();
while (slr.hasMore()) {
Message resp = makeRequestResponseRequired(JSAPI_STREAM_LIST, slr.nextJson(subjectFilter), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(JSAPI_STREAM_LIST, slr.nextJson(subjectFilter), getTimeout());
slr.process(resp);
}
return cacheStreamInfo(slr.getStreams());
Expand Down Expand Up @@ -332,15 +332,15 @@ private MessageInfo _getMessage(String streamName, MessageGetRequest messageGetR
subject = String.format(JSAPI_DIRECT_GET, streamName);
payload = messageGetRequest.serialize();
}
Message resp = makeRequestResponseRequired(subject, payload, jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subject, payload, getTimeout());
if (resp.isStatusMessage()) {
throw new JetStreamApiException(Error.convert(resp.getStatus()));
}
return new MessageInfo(resp, streamName, true);
}
else {
String getSubject = String.format(JSAPI_MSG_GET, streamName);
Message resp = makeRequestResponseRequired(getSubject, messageGetRequest.serialize(), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(getSubject, messageGetRequest.serialize(), getTimeout());
return new MessageInfo(resp, streamName, false).throwOnHasError();
}
}
Expand Down Expand Up @@ -464,7 +464,7 @@ public boolean deleteMessage(String streamName, long seq, boolean erase) throws
validateNotNull(streamName, "Stream Name");
String subj = String.format(JSAPI_MSG_DELETE, streamName);
MessageDeleteRequest mdr = new MessageDeleteRequest(seq, erase);
Message resp = makeRequestResponseRequired(subj, mdr.serialize(), jso.getRequestTimeout());
Message resp = makeRequestResponseRequired(subj, mdr.serialize(), getTimeout());
return new SuccessApiResponse(resp).throwOnHasError().getSuccess();
}

Expand Down

0 comments on commit d263667

Please sign in to comment.