Skip to content

Commit

Permalink
Removes the single subject transform dest field from StreamSource
Browse files Browse the repository at this point in the history
Add checking and reject if the stream mirror/source config contains the now unused single subject transform destination
  • Loading branch information
jnmoyne committed Sep 19, 2023
1 parent 3dc0627 commit d344a26
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 79 deletions.
6 changes: 3 additions & 3 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11575,7 +11575,7 @@ func TestJetStreamMirrorBasics(t *testing.T) {
createStreamServerStreamConfig(&StreamConfig{
Name: "M5",
Storage: FileStorage,
Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransformDest: "foo2"},
Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: "foo2"}}},
}, 0)

createStreamServerStreamConfig(&StreamConfig{
Expand Down Expand Up @@ -11688,7 +11688,7 @@ func TestJetStreamSourceBasics(t *testing.T) {
Name: "MS",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "foo", SubjectTransformDest: "foo2.>"},
{Name: "foo", SubjectTransforms: []SubjectTransformConfig{{Source: ">", Destination: "foo2.>"}}},
{Name: "bar"},
{Name: "baz"},
},
Expand Down Expand Up @@ -11770,7 +11770,7 @@ func TestJetStreamSourceBasics(t *testing.T) {
Name: "FMS2",
Storage: FileStorage,
Sources: []*StreamSource{
{Name: "TEST", OptStartSeq: 11, FilterSubject: "dlc", SubjectTransformDest: "dlc2"},
{Name: "TEST", OptStartSeq: 11, SubjectTransforms: []SubjectTransformConfig{{Source: "dlc", Destination: "dlc2"}}},
},
}
createStream(cfg)
Expand Down
117 changes: 41 additions & 76 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,22 +180,24 @@ type PeerInfo struct {

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
// Do not expose SubjectTransformDest in the client libraries
// it is here because it was enabled for a while in the 2.10 beta but is now removed
// and there is now a check to error out if it is provided in the request
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Expand Down Expand Up @@ -313,8 +315,7 @@ type sourceInfo struct {
qch chan struct{}
sip bool // setup in progress
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sf string // subject filter
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}
Expand Down Expand Up @@ -477,12 +478,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject)
}
if cfg.Mirror.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err)
}
}
} else {
for _, st := range cfg.Mirror.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
Expand All @@ -508,13 +503,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
} else {
for _, st := range ssi.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
Expand Down Expand Up @@ -703,7 +691,7 @@ func (ssi *StreamSource) composeIName() string {
}

source := ssi.FilterSubject
destination := ssi.SubjectTransformDest
destination := fwcs

if len(ssi.SubjectTransforms) == 0 {
// normalize filter and destination in case they are empty
Expand Down Expand Up @@ -1269,6 +1257,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
if cfg.Mirror.SubjectTransformDest != _EMPTY_ {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream mirror can not have a single subject_transform_destination"))
}
if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError()
}
Expand Down Expand Up @@ -1331,6 +1322,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if !isValidName(src.Name) {
return StreamConfig{}, NewJSSourceInvalidStreamNameError()
}
if src.SubjectTransformDest != _EMPTY_ {
return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream sources can not have a single subject_transform_destination"))
}
if _, ok := iNames[src.composeIName()]; !ok {
iNames[src.composeIName()] = struct{}{}
} else {
Expand All @@ -1349,7 +1343,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
}

if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 {
if src.FilterSubject != _EMPTY_ && len(src.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError()
}

Expand Down Expand Up @@ -1785,12 +1779,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

if len(s.SubjectTransforms) == 0 {
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// set for transform if any
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
Expand Down Expand Up @@ -2071,10 +2059,6 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {

var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}

if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}

trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
Expand Down Expand Up @@ -2275,18 +2259,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
}

// Do the subject transform if there's one
if mset.mirror.tr != nil {
m.subj = mset.mirror.tr.TransformSubject(m.subj)
} else {
for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}

for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
Expand Down Expand Up @@ -2504,12 +2485,6 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
}

sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
Expand Down Expand Up @@ -3164,18 +3139,15 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))

// Do the subject transform for the source if there's one
if si.tr != nil {
m.subj = si.tr.TransformSubject(m.subj)
} else {
for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}

for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
Expand Down Expand Up @@ -3336,13 +3308,6 @@ func (mset *stream) startingSequenceForSources() {

if len(ssi.SubjectTransforms) == 0 {
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Set the transform if any
// technically no need to check the error as already validated that it will not before
var err error
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))
Expand Down

0 comments on commit d344a26

Please sign in to comment.