diff --git a/go_test.mod b/go_test.mod index af69c5ab2..bb15a438a 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,19 +4,20 @@ go 1.19 require ( github.com/golang/protobuf v1.4.2 - github.com/klauspost/compress v1.16.5 - github.com/nats-io/nats-server/v2 v2.9.19 + github.com/klauspost/compress v1.16.7 + github.com/nats-io/nats-server/v2 v2.9.22-0.20230809202110-5c538671f7b6 github.com/nats-io/nkeys v0.4.4 github.com/nats-io/nuid v1.0.1 go.uber.org/goleak v1.2.1 - golang.org/x/text v0.9.0 + golang.org/x/text v0.12.0 google.golang.org/protobuf v1.23.0 ) require ( github.com/minio/highwayhash v1.0.2 // indirect github.com/nats-io/jwt/v2 v2.4.1 // indirect - golang.org/x/crypto v0.9.0 // indirect - golang.org/x/sys v0.8.0 // indirect + go.uber.org/automaxprocs v1.5.3 // indirect + golang.org/x/crypto v0.11.0 // indirect + golang.org/x/sys v0.10.0 // indirect golang.org/x/time v0.3.0 // indirect ) diff --git a/go_test.sum b/go_test.sum index b9c9fd476..773433f18 100644 --- a/go_test.sum +++ b/go_test.sum @@ -12,27 +12,39 @@ github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/klauspost/compress v1.16.5 h1:IFV2oUNUzZaz+XyusxpLzpzS8Pt5rh0Z16For/djlyI= github.com/klauspost/compress v1.16.5/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.4.1 h1:Y35W1dgbbz2SQUYDPCaclXcuqleVmpbRa7646Jf2EX4= github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI= -github.com/nats-io/nats-server/v2 v2.9.19 h1:OF9jSKZGo425C/FcVVIvNgpd36CUe7aVTTXEZRJk6kA= -github.com/nats-io/nats-server/v2 v2.9.19/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= +github.com/nats-io/nats-server/v2 v2.9.20-0.20230724215505-4a765b6ef283 h1:9DBdRSOLZFaHmF/xPy7PvVk5Slxm3i1B141QtgPF2j0= +github.com/nats-io/nats-server/v2 v2.9.20-0.20230724215505-4a765b6ef283/go.mod h1:aTb/xtLCGKhfTFLxP591CMWfkdgBmcUUSkiSOe5A3gw= +github.com/nats-io/nats-server/v2 v2.9.22-0.20230809202110-5c538671f7b6 h1:FrREvhtoZ255dLeg5/h8o+0zzdM00OhUtlqE3ZsvEdg= +github.com/nats-io/nats-server/v2 v2.9.22-0.20230809202110-5c538671f7b6/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU= github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA= github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= +go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g= golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0= +golang.org/x/crypto v0.11.0 h1:6Ewdq3tDic1mg5xRO4milcWCfMVQhI4NkqWWvqejpuA= +golang.org/x/crypto v0.11.0/go.mod h1:xgJhtzW8F9jGdVFWZESrid1U1bjeNy4zgy5cRr/CIio= golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.10.0 h1:SqMFp9UcQJZa+pmYuAKjd9xq1f0j5rLcDIk0mj4qAsA= +golang.org/x/sys v0.10.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0 h1:k+n5B8goJNdU7hSvEtMUz3d1Q6D/XW4COJSJR6fN0mc= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/jetstream/errors.go b/jetstream/errors.go index 90ae07163..116c3bc18 100644 --- a/jetstream/errors.go +++ b/jetstream/errors.go @@ -79,6 +79,26 @@ var ( // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} + // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream subject transform. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream source subject transform. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} + + // ErrStreamSourceMultipleFilterSubjectsNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing CreateStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceMultipleFilterSubjectsNotSupported JetStreamError = &jsError{message: "stream sourcing with multiple subject filters not supported by nats-server"} + // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 1ff86c336..10a2bb9b9 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -350,6 +350,25 @@ func (js *jetStream) CreateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" { + return nil, ErrStreamSourceSubjectTransformNotSupported + } + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported + } + } + } + return &stream{ jetStream: js, name: cfg.Name, @@ -413,6 +432,25 @@ func (js *jetStream) UpdateStream(ctx context.Context, cfg StreamConfig) (Stream return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if cfg.Sources[i].SubjectTransformDest != "" && resp.Sources[i].SubjectTransformDest == "" { + return nil, ErrStreamSourceSubjectTransformNotSupported + } + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleFilterSubjectsNotSupported + } + } + } + return &stream{ jetStream: js, name: cfg.Name, diff --git a/jetstream/stream_config.go b/jetstream/stream_config.go index 9b5461df0..67d58f9b2 100644 --- a/jetstream/stream_config.go +++ b/jetstream/stream_config.go @@ -60,6 +60,9 @@ type ( DenyPurge bool `json:"deny_purge,omitempty"` AllowRollup bool `json:"allow_rollup_hdrs,omitempty"` + // Allow applying a subject transform to incoming messages before doing anything else + SubjectTransform *SubjectTransformConfig `json:"subject_transform,omitempty"` + // Allow republish of the message after being sequenced and stored. RePublish *RePublish `json:"republish,omitempty"` @@ -71,9 +74,12 @@ type ( // StreamSourceInfo shows information about an upstream stream source. StreamSourceInfo struct { - Name string `json:"name"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` + Name string `json:"name"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransformDest string `json:"subject_transform_dest,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamState is information about the given stream. @@ -109,6 +115,12 @@ type ( Lag uint64 `json:"lag,omitempty"` } + // SubjectTransformConfig is for applying a subject transform (to matching messages) before doing anything else when a new message is received + SubjectTransformConfig struct { + Source string `json:"src"` + Destination string `json:"dest"` + } + // RePublish is for republishing messages once committed to a stream. The original // subject is remapped from the subject pattern to the destination pattern. RePublish struct { @@ -125,12 +137,14 @@ type ( // StreamSource dictates how streams can source from other streams. StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - External *ExternalStream `json:"external,omitempty"` - Domain string `json:"-"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransformDest string `json:"subject_transform_dest,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another diff --git a/jserrors.go b/jserrors.go index d846b666b..c8b1f5fc6 100644 --- a/jserrors.go +++ b/jserrors.go @@ -33,6 +33,26 @@ var ( // ErrStreamNameAlreadyInUse is returned when a stream with given name already exists and has a different configuration. ErrStreamNameAlreadyInUse JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeStreamNameInUse, Description: "stream name already in use", Code: 400}} + // ErrStreamSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream subject transform. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceSubjectTransformNotSupported is returned when the connected nats-server version does not support setting + // the stream source subject transform. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceSubjectTransformNotSupported JetStreamError = &jsError{message: "stream subject transformation not supported by nats-server"} + + // ErrStreamSourceNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceNotSupported JetStreamError = &jsError{message: "stream sourcing is not supported by nats-server"} + + // ErrStreamSourceMultipleSubjectTransformsNotSupported is returned when the connected nats-server version does not support setting + // the stream sources. If this error is returned when executing AddStream(), the stream with invalid + // configuration was already created in the server. + ErrStreamSourceMultipleSubjectTransformsNotSupported JetStreamError = &jsError{message: "stream sourceing with multiple subject transforms not supported by nats-server"} + // ErrConsumerNotFound is an error returned when consumer with given name does not exist. ErrConsumerNotFound JetStreamError = &jsError{apiErr: &APIError{ErrorCode: JSErrCodeConsumerNotFound, Description: "consumer not found", Code: 404}} diff --git a/jsm.go b/jsm.go index 32c5ea4ee..78eab0576 100644 --- a/jsm.go +++ b/jsm.go @@ -161,13 +161,14 @@ type Placement struct { // StreamSource dictates how streams can source from other streams. type StreamSource struct { - Name string `json:"name"` - OptStartSeq uint64 `json:"opt_start_seq,omitempty"` - OptStartTime *time.Time `json:"opt_start_time,omitempty"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"subject_transform_dest,omitempty"` - External *ExternalStream `json:"external,omitempty"` - Domain string `json:"-"` + Name string `json:"name"` + OptStartSeq uint64 `json:"opt_start_seq,omitempty"` + OptStartTime *time.Time `json:"opt_start_time,omitempty"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransformDest string `json:"subject_transform_dest,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` + External *ExternalStream `json:"external,omitempty"` + Domain string `json:"-"` } // ExternalStream allows you to qualify access to a stream source in another @@ -795,6 +796,24 @@ func (js *js) AddStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error) { return nil, resp.Error } + // check that input subject transform (if used) is reflected in the returned ConsumerInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if cfg.Sources[i].SubjectTransformDest != _EMPTY_ && resp.Sources[i].SubjectTransformDest == _EMPTY_ { + return nil, ErrStreamSourceSubjectTransformNotSupported + } + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported + } + } + } + return resp.StreamInfo, nil } @@ -912,13 +931,14 @@ type StreamAlternate struct { // StreamSourceInfo shows information about an upstream stream source. type StreamSourceInfo struct { - Name string `json:"name"` - Lag uint64 `json:"lag"` - Active time.Duration `json:"active"` - External *ExternalStream `json:"external"` - Error *APIError `json:"error"` - FilterSubject string `json:"filter_subject,omitempty"` - SubjectTransformDest string `json:"subject_transform_dest,omitempty"` + Name string `json:"name"` + Lag uint64 `json:"lag"` + Active time.Duration `json:"active"` + External *ExternalStream `json:"external"` + Error *APIError `json:"error"` + FilterSubject string `json:"filter_subject,omitempty"` + SubjectTransformDest string `json:"subject_transform_dest,omitempty"` + SubjectTransforms []SubjectTransformConfig `json:"subject_transforms,omitempty"` } // StreamState is information about the given stream. @@ -990,6 +1010,26 @@ func (js *js) UpdateStream(cfg *StreamConfig, opts ...JSOpt) (*StreamInfo, error } return nil, resp.Error } + + // check that input subject transform (if used) is reflected in the returned StreamInfo + if cfg.SubjectTransform != nil && resp.StreamInfo.Config.SubjectTransform == nil { + return nil, ErrStreamSubjectTransformNotSupported + } + + if len(cfg.Sources) != 0 { + if len(cfg.Sources) != len(resp.Sources) { + return nil, ErrStreamSourceNotSupported + } + for i := range cfg.Sources { + if cfg.Sources[i].SubjectTransformDest != _EMPTY_ && resp.Sources[i].SubjectTransformDest == _EMPTY_ { + return nil, ErrStreamSourceSubjectTransformNotSupported + } + if len(cfg.Sources[i].SubjectTransforms) != 0 && len(resp.Sources[i].SubjectTransforms) == 0 { + return nil, ErrStreamSourceMultipleSubjectTransformsNotSupported + } + } + } + return resp.StreamInfo, nil }