Skip to content

Commit

Permalink
[FIXED] Creating ephemeral consumers on server < 2.9.0
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jun 19, 2023
1 parent 61b0ce9 commit ec00e66
Showing 1 changed file with 16 additions and 7 deletions.
23 changes: 16 additions & 7 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,20 +361,29 @@ func (js *js) upsertConsumer(stream, consumerName string, cfg *ConsumerConfig, o

var ccSubj string
if consumerName == _EMPTY_ {
// if consumer name is empty, use the legacy ephemeral endpoint
// if consumer name is empty (neither Durable nor Name is set), use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
} else if err := checkConsumerName(consumerName); err != nil {
return nil, err
} else if !js.nc.serverMinVersion(2, 9, 0) || (cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate) {
// if server version is lower than 2.9.0 or user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if above server version 2.9.0, use the endpoints with consumer name
if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
} else if js.nc.serverMinVersion(2, 9, 0) {
if cfg.Durable != "" && js.opts.featureFlags.useDurableConsumerCreate {
// if user set the useDurableConsumerCreate flag, use the legacy DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else if cfg.FilterSubject == _EMPTY_ || cfg.FilterSubject == ">" {
// if filter subject is empty or ">", use the endpoint without filter subject
ccSubj = fmt.Sprintf(apiConsumerCreateT, stream, consumerName)
} else {
// if filter subject is not empty, use the endpoint with filter subject
ccSubj = fmt.Sprintf(apiConsumerCreateWithFilterSubjectT, stream, consumerName, cfg.FilterSubject)
}
} else {
if cfg.Durable != "" {
// if Durable is set, use the DURABLE.CREATE endpoint
ccSubj = fmt.Sprintf(apiDurableCreateT, stream, consumerName)
} else {
// if Durable is not set, use the legacy ephemeral endpoint
ccSubj = fmt.Sprintf(apiLegacyConsumerCreateT, stream)
}
}

resp, err := js.apiRequestWithContext(o.ctx, js.apiSubj(ccSubj), req)
Expand Down

0 comments on commit ec00e66

Please sign in to comment.