diff --git a/server/stream.go b/server/stream.go index 43f8f6485c2..2180dc960a3 100644 --- a/server/stream.go +++ b/server/stream.go @@ -180,25 +180,23 @@ type PeerInfo struct { // StreamSourceInfo shows information about an upstream stream source. type StreamSourceInfo struct { - Name string `json:"name"` - External *ExternalStream `json:"external,omitempty"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` - Error *ApiError `json:"error,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"-"` - SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + Name string `json:"name"` + External *ExternalStream `json:"external,omitempty"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + Error *ApiError `json:"error,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"-"` - SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` - External *ExternalStream `json:"external,omitempty"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` // Internal iname string // For indexing when stream names are the same for multiple sources. @@ -313,8 +311,7 @@ type sourceInfo struct { qch chan struct{} sip bool // setup in progress wg sync.WaitGroup - sf string // subject filter - tr *subjectTransform + sf string // subject filter sfs []string // subject filters trs []*subjectTransform // subject transforms } @@ -477,12 +474,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt jsa.mu.Unlock() return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject) } - if cfg.Mirror.SubjectTransformDest != _EMPTY_ { - if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil { - jsa.mu.Unlock() - return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err) - } - } } else { for _, st := range cfg.Mirror.SubjectTransforms { if st.Source != _EMPTY_ && !IsValidSubject(st.Source) { @@ -508,13 +499,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt jsa.mu.Unlock() return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject) } - // check the transform, if any, is valid - if ssi.SubjectTransformDest != _EMPTY_ { - if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil { - jsa.mu.Unlock() - return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err) - } - } } else { for _, st := range ssi.SubjectTransforms { if st.Source != _EMPTY_ && !IsValidSubject(st.Source) { @@ -703,7 +687,7 @@ func (ssi *StreamSource) composeIName() string { } source := ssi.FilterSubject - destination := ssi.SubjectTransformDest + destination := fwcs if len(ssi.SubjectTransforms) == 0 { // normalize filter and destination in case they are empty @@ -1271,7 +1255,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi if len(cfg.Sources) > 0 { return StreamConfig{}, NewJSMirrorWithSourcesError() } - if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 { + if cfg.Mirror.FilterSubject != _EMPTY_ && len(cfg.Mirror.SubjectTransforms) != 0 { return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError() } // Check subject filters overlap. @@ -1351,7 +1335,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi } } - if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 { + if src.FilterSubject != _EMPTY_ && len(src.SubjectTransforms) != 0 { return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError() } @@ -1787,12 +1771,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool) if len(s.SubjectTransforms) == 0 { si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject} - // set for transform if any - var err error - if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil { - mset.mu.Unlock() - return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err) - } } else { si = &sourceInfo{name: s.Name, iname: s.iname} si.trs = make([]*subjectTransform, len(s.SubjectTransforms)) @@ -2073,10 +2051,6 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo { var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf} - if si.tr != nil { - ssi.SubjectTransformDest = si.tr.dest - } - trConfigs := make([]SubjectTransformConfig, len(si.sfs)) for i := range si.sfs { destination := _EMPTY_ @@ -2277,18 +2251,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool { } // Do the subject transform if there's one - if mset.mirror.tr != nil { - m.subj = mset.mirror.tr.TransformSubject(m.subj) - } else { - for _, tr := range mset.mirror.trs { - if tr == nil { - continue - } else { - tsubj, err := tr.Match(m.subj) - if err == nil { - m.subj = tsubj - break - } + + for _, tr := range mset.mirror.trs { + if tr == nil { + continue + } else { + tsubj, err := tr.Match(m.subj) + if err == nil { + m.subj = tsubj + break } } } @@ -2506,12 +2477,6 @@ func (mset *stream) setupMirrorConsumer() error { if mset.cfg.Mirror.FilterSubject != _EMPTY_ { req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject mirror.sf = mset.cfg.Mirror.FilterSubject - // Set transform if any - var err error - mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest) - if err != nil { - mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err) - } } sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms)) @@ -3173,18 +3138,15 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool { hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply)) // Do the subject transform for the source if there's one - if si.tr != nil { - m.subj = si.tr.TransformSubject(m.subj) - } else { - for _, tr := range si.trs { - if tr == nil { - continue - } else { - tsubj, err := tr.Match(m.subj) - if err == nil { - m.subj = tsubj - break - } + + for _, tr := range si.trs { + if tr == nil { + continue + } else { + tsubj, err := tr.Match(m.subj) + if err == nil { + m.subj = tsubj + break } } } @@ -3345,13 +3307,6 @@ func (mset *stream) startingSequenceForSources() { if len(ssi.SubjectTransforms) == 0 { si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject} - // Set the transform if any - // technically no need to check the error as already validated that it will not before - var err error - si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest) - if err != nil { - mset.srv.Errorf("Unable to get subject transform for source: %v", err) - } } else { sfs := make([]string, len(ssi.SubjectTransforms)) trs := make([]*subjectTransform, len(ssi.SubjectTransforms))