Skip to content

Commit

Permalink
Removes the single subject transform dest field from StreamSource
Browse files Browse the repository at this point in the history
Co-authored-by: Jean-Noël Moyne <[email protected]>
Co-authored-by: Neil Twigg <[email protected]>

Signed-off-by: Neil Twigg <[email protected]>
  • Loading branch information
jnmoyne authored and neilalexander committed Sep 20, 2023
1 parent 81c0a14 commit 9fc2603
Showing 1 changed file with 35 additions and 80 deletions.
115 changes: 35 additions & 80 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,25 +180,23 @@ type PeerInfo struct {

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"-"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
Name string `json:"name"`
External *ExternalStream `json:"external,omitempty"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Error *ApiError `json:"error,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"-"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`

// Internal
iname string // For indexing when stream names are the same for multiple sources.
Expand Down Expand Up @@ -313,8 +311,7 @@ type sourceInfo struct {
qch chan struct{}
sip bool // setup in progress
wg sync.WaitGroup
sf string // subject filter
tr *subjectTransform
sf string // subject filter
sfs []string // subject filters
trs []*subjectTransform // subject transforms
}
Expand Down Expand Up @@ -477,12 +474,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the mirror %w", cfg.Mirror.FilterSubject, ErrBadSubject)
}
if cfg.Mirror.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the mirror %w", cfg.Mirror.FilterSubject, cfg.Mirror.SubjectTransformDest, err)
}
}
} else {
for _, st := range cfg.Mirror.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
Expand All @@ -508,13 +499,6 @@ func (a *Account) addStreamWithAssignment(config *StreamConfig, fsConfig *FileSt
jsa.mu.Unlock()
return nil, fmt.Errorf("subject filter '%s' for the source: %w", ssi.FilterSubject, ErrBadSubject)
}
// check the transform, if any, is valid
if ssi.SubjectTransformDest != _EMPTY_ {
if _, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest); err != nil {
jsa.mu.Unlock()
return nil, fmt.Errorf("subject transform from '%s' to '%s' for the source: %w", ssi.FilterSubject, ssi.SubjectTransformDest, err)
}
}
} else {
for _, st := range ssi.SubjectTransforms {
if st.Source != _EMPTY_ && !IsValidSubject(st.Source) {
Expand Down Expand Up @@ -703,7 +687,7 @@ func (ssi *StreamSource) composeIName() string {
}

source := ssi.FilterSubject
destination := ssi.SubjectTransformDest
destination := fwcs

if len(ssi.SubjectTransforms) == 0 {
// normalize filter and destination in case they are empty
Expand Down Expand Up @@ -1271,7 +1255,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
if len(cfg.Sources) > 0 {
return StreamConfig{}, NewJSMirrorWithSourcesError()
}
if (cfg.Mirror.FilterSubject != _EMPTY_ || cfg.Mirror.SubjectTransformDest != _EMPTY_) && len(cfg.Mirror.SubjectTransforms) != 0 {
if cfg.Mirror.FilterSubject != _EMPTY_ && len(cfg.Mirror.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSMirrorMultipleFiltersNotAllowedError()
}
// Check subject filters overlap.
Expand Down Expand Up @@ -1351,7 +1335,7 @@ func (s *Server) checkStreamCfg(config *StreamConfig, acc *Account) (StreamConfi
}
}

if (src.FilterSubject != _EMPTY_ || src.SubjectTransformDest != _EMPTY_) && len(src.SubjectTransforms) != 0 {
if src.FilterSubject != _EMPTY_ && len(src.SubjectTransforms) != 0 {
return StreamConfig{}, NewJSSourceMultipleFiltersNotAllowedError()
}

Expand Down Expand Up @@ -1787,12 +1771,6 @@ func (mset *stream) updateWithAdvisory(config *StreamConfig, sendAdvisory bool)

if len(s.SubjectTransforms) == 0 {
si = &sourceInfo{name: s.Name, iname: s.iname, sf: s.FilterSubject}
// set for transform if any
var err error
if si.tr, err = NewSubjectTransform(s.FilterSubject, s.SubjectTransformDest); err != nil {
mset.mu.Unlock()
return fmt.Errorf("stream source subject transform from '%s' to '%s': %w", s.FilterSubject, s.SubjectTransformDest, err)
}
} else {
si = &sourceInfo{name: s.Name, iname: s.iname}
si.trs = make([]*subjectTransform, len(s.SubjectTransforms))
Expand Down Expand Up @@ -2073,10 +2051,6 @@ func (mset *stream) sourceInfo(si *sourceInfo) *StreamSourceInfo {

var ssi = StreamSourceInfo{Name: si.name, Lag: si.lag, Error: si.err, FilterSubject: si.sf}

if si.tr != nil {
ssi.SubjectTransformDest = si.tr.dest
}

trConfigs := make([]SubjectTransformConfig, len(si.sfs))
for i := range si.sfs {
destination := _EMPTY_
Expand Down Expand Up @@ -2277,18 +2251,15 @@ func (mset *stream) processInboundMirrorMsg(m *inMsg) bool {
}

// Do the subject transform if there's one
if mset.mirror.tr != nil {
m.subj = mset.mirror.tr.TransformSubject(m.subj)
} else {
for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}

for _, tr := range mset.mirror.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
Expand Down Expand Up @@ -2506,12 +2477,6 @@ func (mset *stream) setupMirrorConsumer() error {
if mset.cfg.Mirror.FilterSubject != _EMPTY_ {
req.Config.FilterSubject = mset.cfg.Mirror.FilterSubject
mirror.sf = mset.cfg.Mirror.FilterSubject
// Set transform if any
var err error
mirror.tr, err = NewSubjectTransform(mset.cfg.Mirror.FilterSubject, mset.cfg.Mirror.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get transform for mirror consumer: %v", err)
}
}

sfs := make([]string, len(mset.cfg.Mirror.SubjectTransforms))
Expand Down Expand Up @@ -3173,18 +3138,15 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))

// Do the subject transform for the source if there's one
if si.tr != nil {
m.subj = si.tr.TransformSubject(m.subj)
} else {
for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}

for _, tr := range si.trs {
if tr == nil {
continue
} else {
tsubj, err := tr.Match(m.subj)
if err == nil {
m.subj = tsubj
break
}
}
}
Expand Down Expand Up @@ -3345,13 +3307,6 @@ func (mset *stream) startingSequenceForSources() {

if len(ssi.SubjectTransforms) == 0 {
si = &sourceInfo{name: ssi.Name, iname: ssi.iname, sf: ssi.FilterSubject}
// Set the transform if any
// technically no need to check the error as already validated that it will not before
var err error
si.tr, err = NewSubjectTransform(ssi.FilterSubject, ssi.SubjectTransformDest)
if err != nil {
mset.srv.Errorf("Unable to get subject transform for source: %v", err)
}
} else {
sfs := make([]string, len(ssi.SubjectTransforms))
trs := make([]*subjectTransform, len(ssi.SubjectTransforms))
Expand Down

0 comments on commit 9fc2603

Please sign in to comment.