Skip to content

Commit

Permalink
[ADDED] Client support for multi-subject transforms in sources and mi…
Browse files Browse the repository at this point in the history
…rrors (#1359)
  • Loading branch information
jnmoyne authored and piotrpio committed Sep 1, 2023
1 parent 4f3bd4e commit 0f9c6e9
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 30 deletions.
11 changes: 6 additions & 5 deletions go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.16.5
github.com/nats-io/nats-server/v2 v2.9.19
github.com/klauspost/compress v1.16.7
github.com/nats-io/nats-server/v2 v2.9.22-0.20230809202110-5c538671f7b6
github.com/nats-io/nkeys v0.4.4
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.9.0
golang.org/x/text v0.12.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.4.1 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/sys v0.8.0 // indirect
go.uber.org/automaxprocs v1.5.3 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/time v0.3.0 // indirect
)
16 changes: 14 additions & 2 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,39 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI=
github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4=
github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.9.19 h1:OF9jSKZGo425C/FcVVIvNgpd36CUe7aVTTXEZRJk6kA=
github.com/nats-io/nats-server/v2 v2.9.19/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats-server/v2 v2.9.20-0.20230724215505-4a765b6ef283 h1:9DBdRSOLZFaHmF/xPy7PvVk5Slxm3i1B141QtgPF2j0=
github.com/nats-io/nats-server/v2 v2.9.20-0.20230724215505-4a765b6ef283/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw=
github.com/nats-io/nats-server/v2 v2.9.22-0.20230809202110-5c538671f7b6 h1:FrREvhtoZ255dLeg5/h8o+0zzdM00OhUtlqE3ZsvEdg=
github.com/nats-io/nats-server/v2 v2.9.22-0.20230809202110-5c538671f7b6/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA=
golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA=
golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
20 changes: 20 additions & 0 deletions jetstream/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,26 @@ var (
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream subject transform. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

// ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream source subject transform. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

// ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"}

// ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing CreateStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

Expand Down
38 changes: 38 additions & 0 deletions jetstream/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,25 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, resp.Error
}

// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
}
}

return &stream{
jetStream: js,
name: cfg.Name,
Expand Down Expand Up @@ -413,6 +432,25 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream
return nil, resp.Error
}

// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported
}
}
}

return &stream{
jetStream: js,
name: cfg.Name,
Expand Down
32 changes: 23 additions & 9 deletions jetstream/stream_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type (
DenyPurge bool `json:"deny_purge,omitempty"`
AllowRollup bool `json:"allow_rollup_hdrs,omitempty"`

// Allow applying a subject transform to incoming messages before doing anything else
SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"`

// Allow republish of the message after being sequenced and stored.
RePublish *RePublish `json:"republish,omitempty"`

Expand All @@ -71,9 +74,12 @@ type (

// StreamSourceInfo shows information about an upstream stream source.
StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamState is information about the given stream.
Expand Down Expand Up @@ -109,6 +115,12 @@ type (
Lag uint64 `json:"lag,omitempty"`
}

// SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received
SubjectTransformConfig struct {
Source string `json:"src"`
Destination string `json:"dest"`
}

// RePublish is for republishing messages once committed to a stream. The original
// subject is remapped from the subject pattern to the destination pattern.
RePublish struct {
Expand All @@ -125,12 +137,14 @@ type (

// StreamSource dictates how streams can source from other streams.
StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
Expand Down
20 changes: 20 additions & 0 deletions jserrors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ var (
// ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration.
ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}}

// ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

// ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting
// the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"}

// ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"}

// ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting
// the stream sources. If this error is returned when executing AddStream(), the stream with invalid
// configuration was already created in the server.
ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"}

// ErrConsumerNotFound is an error returned when consumer with given name does not exist.
ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}}

Expand Down
68 changes: 54 additions & 14 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,14 @@ type Placement struct {

// StreamSource dictates how streams can source from other streams.
type StreamSource struct {
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
Name string `json:"name"`
OptStartSeq uint64 `json:"opt_start_seq,omitempty"`
OptStartTime *time.Time `json:"opt_start_time,omitempty"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
External *ExternalStream `json:"external,omitempty"`
Domain string `json:"-"`
}

// ExternalStream allows you to qualify access to a stream source in another
Expand Down Expand Up @@ -795,6 +796,24 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) {
return nil, resp.Error
}

// check that input subject transform (if used) is reflected in the returned ConsumerInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}
if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != _EMPTY_ && resp.Sources[i].SubjectTransformDest == _EMPTY_ {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}

return resp.StreamInfo, nil
}

Expand Down Expand Up @@ -912,13 +931,14 @@ type StreamAlternate struct {

// StreamSourceInfo shows information about an upstream stream source.
type StreamSourceInfo struct {
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
Name string `json:"name"`
Lag uint64 `json:"lag"`
Active time.Duration `json:"active"`
External *ExternalStream `json:"external"`
Error *APIError `json:"error"`
FilterSubject string `json:"filter_subject,omitempty"`
SubjectTransformDest string `json:"subject_transform_dest,omitempty"`
SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"`
}

// StreamState is information about the given stream.
Expand Down Expand Up @@ -990,6 +1010,26 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error
}
return nil, resp.Error
}

// check that input subject transform (if used) is reflected in the returned StreamInfo
if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil {
return nil, ErrStreamSubjectTransformNotSupported
}

if len(cfg.Sources) != 0 {
if len(cfg.Sources) != len(resp.Sources) {
return nil, ErrStreamSourceNotSupported
}
for i := range cfg.Sources {
if cfg.Sources[i].SubjectTransformDest != _EMPTY_ && resp.Sources[i].SubjectTransformDest == _EMPTY_ {
return nil, ErrStreamSourceSubjectTransformNotSupported
}
if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 {
return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported
}
}
}

return resp.StreamInfo, nil
}

Expand Down

0 comments on commit 0f9c6e9

Please sign in to comment.