From d344a2683a4eee81d8f3aea3d45011af540c5c14 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-No=C3=ABl=20Moyne?= Date: Mon, 18 Sep 2023 16:45:14 -0700 Subject: [PATCH] Removes the single subject transform dest field from StreamSource Add checking and reject if the stream mirror/source config contains the now unused single subject transform destination --- server/jetstream_test.go | 6 +- server/stream.go | 117 ++++++++++++++------------------------- 2 files changed, 44 insertions(+), 79 deletions(-) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 08c9ea253f0..46bc5aa9fc1 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -11575,7 +11575,7 @@ func TestJetStreamMirrorBasics(t *testing.T) { createStreamServerStreamConfig(&StreamConfig{ Name: "M5", Storage: FileStorage, - Mirror: &StreamSource{Name: "S1", FilterSubject: "foo", SubjectTransformDest: "foo2"}, + Mirror: &StreamSource{Name: "S1", SubjectTransforms: []SubjectTransformConfig{{Source: "foo", Destination: "foo2"}}}, }, 0) createStreamServerStreamConfig(&StreamConfig{ @@ -11688,7 +11688,7 @@ func TestJetStreamSourceBasics(t *testing.T) { Name: "MS", Storage: FileStorage, Sources: []*StreamSource{ - {Name: "foo", SubjectTransformDest: "foo2.>"}, + {Name: "foo", SubjectTransforms: []SubjectTransformConfig{{Source: ">", Destination: "foo2.>"}}}, {Name: "bar"}, {Name: "baz"}, }, @@ -11770,7 +11770,7 @@ func TestJetStreamSourceBasics(t *testing.T) { Name: "FMS2", Storage: FileStorage, Sources: []*StreamSource{ - {Name: "TEST", OptStartSeq: 11, FilterSubject: "dlc", SubjectTransformDest: "dlc2"}, + {Name: "TEST", OptStartSeq: 11, SubjectTransforms: []SubjectTransformConfig{{Source: "dlc", Destination: "dlc2"}}}, }, } createStream(cfg) diff --git a/server/stream.go b/server/stream.go index d66f60620fe..998f10c25ba 100644 --- a/server/stream.go +++ b/server/stream.go @@ -180,22 +180,24 @@ type PeerInfo struct { // StreamSourceInfo shows information about an upstream stream source. type StreamSourceInfo struct { - Name string `json:"name"` - External *ExternalStream `json:"external,omitempty"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` - Error *ApiError `json:"error,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"subject_transform_dest,omitempty"` - SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + Name string `json:"name"` + External *ExternalStream `json:"external,omitempty"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + Error *ApiError `json:"error,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + // Do not expose SubjectTransformDest in the client libraries + // it is here because it was enabled for a while in the 2.10 beta but is now removed + // and there is now a check to error out if it is provided in the request SubjectTransformDest string `json:"subject_transform_dest,omitempty"` SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` External *ExternalStream `json:"external,omitempty"` @@ -313,8 +315,7 @@ type sourceInfo struct { qch chan struct{} sip bool // setup in progress wg sync.WaitGroup - sf string // subject filter - tr *subjectTransform + sf string // subject filter sfs []string // subject filters trs []*subjectTransform // subject transforms } @@ -477,12 +478,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt jsa.mu.Unlock() return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject) } - if cfg.Mirror.SubjectTransformDest != _EMPTY_ { - if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil { - jsa.mu.Unlock() - return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err) - } - } } else { for _, st := range cfg.Mirror.SubjectTransforms { if st.Source != _EMPTY_ && !IsValidSubject(st.Source) { @@ -508,13 +503,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt jsa.mu.Unlock() return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject) } - // check the transform, if any, is valid - if ssi.SubjectTransformDest != _EMPTY_ { - if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil { - jsa.mu.Unlock() - return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err) - } - } } else { for _, st := range ssi.SubjectTransforms { if st.Source != _EMPTY_ && !IsValidSubject(st.Source) { @@ -703,7 +691,7 @@ func (ssi *StreamSource) composeIName() string { } source := ssi.FilterSubject - destination := ssi.SubjectTransformDest + destination := fwcs if len(ssi.SubjectTransforms) == 0 { // normalize filter and destination in case they are empty @@ -1269,6 +1257,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if len(cfg.Sources) > 0 { return StreamConfig{}, NewJSMirrorWithSourcesError() } + if cfg.Mirror.SubjectTransformDest != _EMPTY_ { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream mirror can not have a single subject_transform_destination")) + } if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 { return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError() } @@ -1331,6 +1322,9 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if !isValidName(src.Name) { return StreamConfig{}, NewJSSourceInvalidStreamNameError() } + if src.SubjectTransformDest != _EMPTY_ { + return StreamConfig{}, NewJSStreamInvalidConfigError(fmt.Errorf("stream sources can not have a single subject_transform_destination")) + } if _, ok := iNames[src.composeIName()]; !ok { iNames[src.composeIName()] = struct{}{} } else { @@ -1349,7 +1343,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } } - if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 { + if src.FilterSubject != _EMPTY_ && len(src.SubjectTransforms) != 0 { return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError() } @@ -1785,12 +1779,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) if len(s.SubjectTransforms) == 0 { si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject} - // set for transform if any - var err error - if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { - mset.mu.Unlock() - return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err) - } } else { si = &sourceInfo{name: s.Name, iname: s.iname} si.trs = make([]*subjectTransform, len(s.SubjectTransforms)) @@ -2071,10 +2059,6 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo { var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf} - if si.tr != nil { - ssi.SubjectTransformDest = si.tr.dest - } - trConfigs := make([]SubjectTransformConfig, len(si.sfs)) for i := range si.sfs { destination := _EMPTY_ @@ -2275,18 +2259,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { } // Do the subject transform if there's one - if mset.mirror.tr != nil { - m.subj = mset.mirror.tr.TransformSubject(m.subj) - } else { - for _, tr := range mset.mirror.trs { - if tr == nil { - continue - } else { - tsubj, err := tr.Match(m.subj) - if err == nil { - m.subj = tsubj - break - } + + for _, tr := range mset.mirror.trs { + if tr == nil { + continue + } else { + tsubj, err := tr.Match(m.subj) + if err == nil { + m.subj = tsubj + break } } } @@ -2504,12 +2485,6 @@ func (mset *stream) setupMirrorConsumer() error { if mset.cfg.Mirror.FilterSubject != _EMPTY_ { req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject mirror.sf = mset.cfg.Mirror.FilterSubject - // Set transform if any - var err error - mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) - if err != nil { - mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) - } } sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms)) @@ -3164,18 +3139,15 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply)) // Do the subject transform for the source if there's one - if si.tr != nil { - m.subj = si.tr.TransformSubject(m.subj) - } else { - for _, tr := range si.trs { - if tr == nil { - continue - } else { - tsubj, err := tr.Match(m.subj) - if err == nil { - m.subj = tsubj - break - } + + for _, tr := range si.trs { + if tr == nil { + continue + } else { + tsubj, err := tr.Match(m.subj) + if err == nil { + m.subj = tsubj + break } } } @@ -3336,13 +3308,6 @@ func (mset *stream) startingSequenceForSources() { if len(ssi.SubjectTransforms) == 0 { si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject} - // Set the transform if any - // technically no need to check the error as already validated that it will not before - var err error - si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) - if err != nil { - mset.srv.Errorf("Unable to get subject transform for source: %v", err) - } } else { sfs := make([]string, len(ssi.SubjectTransforms)) trs := make([]*subjectTransform, len(ssi.SubjectTransforms))