Skip to content

Commit

Permalink
Add check for group name
Browse files Browse the repository at this point in the history
Signed-off-by: Tomasz Pietrek <[email protected]>
  • Loading branch information
Jarema committed Nov 12, 2024
1 parent 05820ab commit b88c300
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 6 deletions.
12 changes: 10 additions & 2 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math/rand"
"reflect"
"regexp"
"slices"
"strconv"
"strings"
Expand All @@ -41,6 +42,10 @@ const (
JSPullRequestNatsPinId = "Nats-Pin-Id"
)

var (
validGroupName = regexp.MustCompile(`^[a-zA-Z0-9/_=-]{1,16}$`)
)

// Headers sent when batch size was completed, but there were remaining bytes.
const JsPullRequestRemainingBytesT = "NATS/1.0 409 Batch Completed\r\n%s: %d\r\n%s: %d\r\n\r\n"

Expand Down Expand Up @@ -818,6 +823,9 @@ func checkConsumerCfg(
if group == _EMPTY_ {
return NewJSConsumerEmptyGroupNameError()
}
if !validGroupName.MatchString(group) {
return NewJSConsumerInvalidGroupNameError()
}
}
}
return nil
Expand Down Expand Up @@ -1624,9 +1632,9 @@ func (o *consumer) sendPinnedAdvisoryLocked(group string) {

}
func (o *consumer) sendUnpinnedAdvisoryLocked(group string, reason string) {
e := JSConsumerGroupUnPinnedAdvisory{
e := JSConsumerGroupUnpinnedAdvisory{
TypedEvent: TypedEvent{
Type: JSStreamGroupUnPinnedAdvisoryType,
Type: JSConsumerGroupUnpinnedAdvisoryType,
ID: nuid.Next(),
Time: time.Now().UTC(),
},
Expand Down
10 changes: 10 additions & 0 deletions server/errors.json
Original file line number Diff line number Diff line change
Expand Up @@ -1598,5 +1598,15 @@
"help": "",
"url": "",
"deprecates": ""
},
{
"constant": "JSConsumerInvalidGroupNameErr",
"code": 400,
"error_code": 10162,
"description": "Valid name must match A-Z, a-z, 0-9, -_/=)+ and may not exceed 16 characters",
"comment": "",
"help": "",
"url": "",
"deprecates": ""
}
]
5 changes: 5 additions & 0 deletions server/jetstream_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3424,6 +3424,11 @@ func (s *Server) jsConsumerUnpinRequest(sub *subscription, c *client, _ *Account
return
}

if !validGroupName.MatchString(req.Group) {
resp.Error = NewJSConsumerInvalidGroupNameError()
s.sendAPIErrResponse(ci, acc, subject, reply, string(msg), s.jsonResponse(&resp))
return
}
if s.JetStreamIsClustered() {
// Check to make sure the stream is assigned.
js, cc := s.getJetStreamCluster()
Expand Down
22 changes: 21 additions & 1 deletion server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,6 @@ func TestJetStreamConsumerUnpinPickDifferentRequest(t *testing.T) {
require_NoError(t, err)
pinId := msg.Header.Get("Nats-Pin-Id")
require_NotEqual(t, pinId, "")
fmt.Printf("INITIAL PIN: %v\n", msg.Header.Get("Nats-Pin-Id"))

reqPinned := JSApiConsumerGetNextRequest{Batch: 5, Expires: 15 * time.Second, PriorityGroup: PriorityGroup{
Group: "A",
Expand Down Expand Up @@ -2015,6 +2014,7 @@ func TestJetStreamConsumerUnpin(t *testing.T) {
{"unpin on missing stream", nc, "NOT_EXIST", "C", "A", &ApiError{ErrCode: uint16(JSStreamNotFoundErr)}},
{"unpin on missing consumer", nc, "TEST", "NOT_EXIST", "A", &ApiError{ErrCode: uint16(JSConsumerNotFoundErr)}},
{"unpin missing group", nc, "TEST", "C", "", &ApiError{ErrCode: uint16(JSInvalidJSONErr)}},
{"unpin bad group name", nc, "TEST", "C", "group name\r\n", &ApiError{ErrCode: uint16(JSConsumerInvalidGroupNameErr)}},
{"ok unpin", nc, "TEST", "C", "A", nil},
} {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -2368,6 +2368,26 @@ func TestJetStreamConsumerMultipleFitersWithStartDate(t *testing.T) {

}

func TestPriorityGroupNameRegex(t *testing.T) {
for _, test := range []struct {
name string
group string
valid bool
}{
{"valid-short", "A", true},
{"valid-with-accepted-special-chars", "group/consumer=A", true},
{"empty", "", false},
{"with-space", "A B", false},
{"with-tab", "A B", false},
{"too-long-name", "group-name-that-is-too-long", false},
{"line-termination", "\r\n", false},
} {
t.Run(test.name, func(t *testing.T) {
require_Equal(t, test.valid, validGroupName.MatchString(test.group))
})
}
}

func sendRequest(t *testing.T, nc *nats.Conn, reply string, req JSApiConsumerGetNextRequest) *nats.Subscription {
reqb, _ := json.Marshal(req)
replies, err := nc.SubscribeSync(reply)
Expand Down
14 changes: 14 additions & 0 deletions server/jetstream_errors_generated.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ const (
// JSConsumerInvalidDeliverSubject invalid push consumer deliver subject
JSConsumerInvalidDeliverSubject ErrorIdentifier = 10112

// JSConsumerInvalidGroupNameErr Valid name must match A-Z, a-z, 0-9, -_/=)+ and may not exceed 16 characters
JSConsumerInvalidGroupNameErr ErrorIdentifier = 10162

// JSConsumerInvalidPolicyErrF Generic delivery policy error ({err})
JSConsumerInvalidPolicyErrF ErrorIdentifier = 10094

Expand Down Expand Up @@ -527,6 +530,7 @@ var (
JSConsumerHBRequiresPushErr: {Code: 400, ErrCode: 10088, Description: "consumer idle heartbeat requires a push based consumer"},
JSConsumerInactiveThresholdExcess: {Code: 400, ErrCode: 10153, Description: "consumer inactive threshold exceeds system limit of {limit}"},
JSConsumerInvalidDeliverSubject: {Code: 400, ErrCode: 10112, Description: "invalid push consumer deliver subject"},
JSConsumerInvalidGroupNameErr: {Code: 400, ErrCode: 10162, Description: "Valid name must match A-Z, a-z, 0-9, -_/=)+ and may not exceed 16 characters"},
JSConsumerInvalidPolicyErrF: {Code: 400, ErrCode: 10094, Description: "{err}"},
JSConsumerInvalidPriorityGroupErr: {Code: 400, ErrCode: 10160, Description: "Provided priority group does not exist for this consumer"},
JSConsumerInvalidSamplingErrF: {Code: 400, ErrCode: 10095, Description: "failed to parse consumer sampling configuration: {err}"},
Expand Down Expand Up @@ -1087,6 +1091,16 @@ func NewJSConsumerInvalidDeliverSubjectError(opts ...ErrorOption) *ApiError {
return ApiErrors[JSConsumerInvalidDeliverSubject]
}

// NewJSConsumerInvalidGroupNameError creates a new JSConsumerInvalidGroupNameErr error: "Valid name must match A-Z, a-z, 0-9, -_/=)+ and may not exceed 16 characters"
func NewJSConsumerInvalidGroupNameError(opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
if ae, ok := eopts.err.(*ApiError); ok {
return ae
}

return ApiErrors[JSConsumerInvalidGroupNameErr]
}

// NewJSConsumerInvalidPolicyError creates a new JSConsumerInvalidPolicyErrF error: "{err}"
func NewJSConsumerInvalidPolicyError(err error, opts ...ErrorOption) *ApiError {
eopts := parseOpts(opts)
Expand Down
6 changes: 3 additions & 3 deletions server/jetstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,10 +284,10 @@ type JSConsumerGroupPinnedAdvisory struct {
PinnedClientId string `json:"pinned_id"`
}

const JSStreamGroupUnPinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_unpinned"
const JSConsumerGroupUnpinnedAdvisoryType = "io.nats.jetstream.advisory.v1.consumer_group_unpinned"

// JSConsumerGroupUnPinnedAdvisory indicates that a pin was lost
type JSConsumerGroupUnPinnedAdvisory struct {
// JSConsumerGroupUnpinnedAdvisory indicates that a pin was lost
type JSConsumerGroupUnpinnedAdvisory struct {
TypedEvent
Account string `json:"account,omitempty"`
Stream string `json:"stream"`
Expand Down

0 comments on commit b88c300

Please sign in to comment.