Skip to content

Commit

Permalink
Use new consumer create subject when single subject filter specified …
Browse files Browse the repository at this point in the history
…in `SubjectFilters` (#4564)

This fixes an issue where specifying a single subject filter, i.e. in
`SubjectFilters` or `SubjectTransforms`, instead of using
`SubjectFilter` would result in the old consumer create subject being
incorrectly used.

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
neilalexander authored Sep 20, 2023
2 parents 29ba4aa + 40ce0a9 commit 81c0a14
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 0 deletions.
75 changes: 75 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21853,3 +21853,78 @@ func TestJetStreamSyncInterval(t *testing.T) {
})
}
}

func TestJetStreamFilteredSubjectUsesNewConsumerCreateSubject(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, _ := jsClientConnect(t, s)
defer nc.Close()

extEndpoint := make(chan *nats.Msg, 1)
normalEndpoint := make(chan *nats.Msg, 1)

_, err := nc.ChanSubscribe(JSApiConsumerCreateEx, extEndpoint)
require_NoError(t, err)

_, err = nc.ChanSubscribe(JSApiConsumerCreate, normalEndpoint)
require_NoError(t, err)

testStreamSource := func(name string, shouldBeExtended bool, ss StreamSource) {
t.Run(name, func(t *testing.T) {
req := StreamConfig{
Name: name,
Storage: MemoryStorage,
Subjects: []string{fmt.Sprintf("foo.%s", name)},
Sources: []*StreamSource{&ss},
}
reqJson, err := json.Marshal(req)
require_NoError(t, err)

_, err = nc.Request(fmt.Sprintf(JSApiStreamCreateT, name), reqJson, time.Second)
require_NoError(t, err)

select {
case <-time.After(time.Second * 5):
t.Fatalf("Timed out waiting for receive consumer create")
case <-extEndpoint:
if !shouldBeExtended {
t.Fatalf("Expected normal consumer create, got extended")
}
case <-normalEndpoint:
if shouldBeExtended {
t.Fatalf("Expected extended consumer create, got normal")
}
}
})
}

testStreamSource("OneFilterSubject", true, StreamSource{
Name: "source",
FilterSubject: "bar.>",
})

testStreamSource("OneTransform", true, StreamSource{
Name: "source",
SubjectTransforms: []SubjectTransformConfig{
{
Source: "bar.one.>",
Destination: "bar.two.>",
},
},
})

testStreamSource("TwoTransforms", false, StreamSource{
Name: "source",
SubjectTransforms: []SubjectTransformConfig{
{
Source: "bar.one.>",
Destination: "bar.two.>",
},
{
Source: "baz.one.>",
Destination: "baz.two.>",
},
},
})
}
7 changes: 7 additions & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2893,6 +2893,13 @@ func (mset *stream) setSourceConsumer(iname string, seq uint64, startTime time.T
if req.Config.FilterSubject != _EMPTY_ {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject)
} else if len(req.Config.FilterSubjects) == 1 {
req.Config.Name = fmt.Sprintf("src-%s", createConsumerName())
// It is necessary to switch to using FilterSubject here as the extended consumer
// create API checks for it, so as to not accidentally allow multiple filtered subjects.
req.Config.FilterSubject = req.Config.FilterSubjects[0]
req.Config.FilterSubjects = nil
subject = fmt.Sprintf(JSApiConsumerCreateExT, si.name, req.Config.Name, req.Config.FilterSubject)
} else {
subject = fmt.Sprintf(JSApiConsumerCreateT, si.name)
}
Expand Down

0 comments on commit 81c0a14

Please sign in to comment.