Skip to content

Commit

Permalink
Fixes subscribeType metadata field not being respected for Pulsar p…
Browse files Browse the repository at this point in the history
…ub sub (#3603)

Signed-off-by: Elena Kolevska <[email protected]>
Co-authored-by: Artur Souza <[email protected]>
  • Loading branch information
elena-kolevska and artursouza authored Nov 22, 2024
1 parent f521a76 commit 2aea319
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 30 deletions.
38 changes: 19 additions & 19 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@ import (
)

type pulsarMetadata struct {
Host string `mapstructure:"host"`
ConsumerID string `mapstructure:"consumerID"`
EnableTLS bool `mapstructure:"enableTLS"`
DisableBatching bool `mapstructure:"disableBatching"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"`
BatchingMaxSize uint `mapstructure:"batchingMaxSize"`
BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"`
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`

Token string `mapstructure:"token"`
Host string `mapstructure:"host"`
ConsumerID string `mapstructure:"consumerID"`
EnableTLS bool `mapstructure:"enableTLS"`
DisableBatching bool `mapstructure:"disableBatching"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"`
BatchingMaxSize uint `mapstructure:"batchingMaxSize"`
BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"`
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
SubscriptionType string `mapstructure:"subscribeType"`
Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
}

Expand Down
11 changes: 10 additions & 1 deletion pubsub/pulsar/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,13 @@ metadata:
Sets the size of the consumer receive queue.
Controls how many messages can be accumulated by the consumer before it is explicitly called to read messages by Dapr.
default: '"1000"'
example: '"1000"'
example: '"1000"'
- name: subscribeType
type: string
description: |
Pulsar supports four subscription types:"shared", "exclusive", "failover", "key_shared".
default: '"shared"'
example: '"exclusive"'
url:
title: "Pulsar Subscription Types"
url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#subscription-types"
63 changes: 53 additions & 10 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
return nil, errors.New("pulsar error: missing pulsar host")
}

var err error
m.SubscriptionType, err = parseSubscriptionType(meta.Properties[subscribeTypeKey])
if err != nil {
return nil, errors.New("invalid subscription type. Accepted values are `exclusive`, `shared`, `failover` and `key_shared`")
}

for k, v := range meta.Properties {
switch {
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
Expand Down Expand Up @@ -170,10 +176,8 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
return err
}
pulsarURL := m.Host
if !strings.HasPrefix(m.Host, "http://") &&
!strings.HasPrefix(m.Host, "https://") {
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, m.Host)
}

pulsarURL = sanitiseURL(pulsarURL)
options := pulsar.ClientOptions{
URL: pulsarURL,
OperationTimeout: 30 * time.Second,
Expand Down Expand Up @@ -226,6 +230,23 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
return nil
}

func sanitiseURL(pulsarURL string) string {
prefixes := []string{"pulsar+ssl://", "pulsar://", "http://", "https://"}

hasPrefix := false
for _, prefix := range prefixes {
if strings.HasPrefix(pulsarURL, prefix) {
hasPrefix = true
break
}
}

if !hasPrefix {
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, pulsarURL)
}
return pulsarURL
}

func (p *Pulsar) useProducerEncryption() bool {
return p.metadata.PublicKey != "" && p.metadata.Keys != ""
}
Expand Down Expand Up @@ -370,11 +391,22 @@ func parsePublishMetadata(req *pubsub.PublishRequest, schema schemaMetadata) (
return msg, nil
}

// default: shared
func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType {
func parseSubscriptionType(in string) (string, error) {
subsType := strings.ToLower(in)
switch subsType {
case subscribeTypeExclusive, subscribeTypeFailover, subscribeTypeShared, subscribeTypeKeyShared:
return subsType, nil
case "":
return subscribeTypeShared, nil
default:
return "", fmt.Errorf("invalid subscription type: %s", subsType)
}
}

// getSubscribeType doesn't do extra validations, because they were done in parseSubscriptionType.
func getSubscribeType(subsTypeStr string) pulsar.SubscriptionType {
var subsType pulsar.SubscriptionType

subsTypeStr := strings.ToLower(metadata[subscribeTypeKey])
switch subsTypeStr {
case subscribeTypeExclusive:
subsType = pulsar.Exclusive
Expand All @@ -384,8 +416,6 @@ func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType {
subsType = pulsar.Shared
case subscribeTypeKeyShared:
subsType = pulsar.KeyShared
default:
subsType = pulsar.Shared
}

return subsType
Expand All @@ -400,15 +430,27 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han

topic := p.formatTopic(req.Topic)

subscribeType := p.metadata.SubscriptionType
if s, exists := req.Metadata[subscribeTypeKey]; exists {
subscribeType = s
}

options := pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: p.metadata.ConsumerID,
Type: getSubscribeType(req.Metadata),
Type: getSubscribeType(subscribeType),
MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
}

// Handle KeySharedPolicy for key_shared subscription type
if options.Type == pulsar.KeyShared {
options.KeySharedPolicy = &pulsar.KeySharedPolicy{
Mode: pulsar.KeySharedPolicyModeAutoSplit,
}
}

if p.useConsumerEncryption() {
var reader crypto.KeyReader
if isValidPEM(p.metadata.PublicKey) {
Expand All @@ -430,6 +472,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
p.logger.Debugf("Could not subscribe to %s, full topic name in pulsar is %s", req.Topic, topic)
return err
}
p.logger.Debugf("Subscribed to '%s'(%s) with type '%s'", req.Topic, topic, subscribeType)

p.wg.Add(2)
listenCtx, cancel := context.WithCancel(ctx)
Expand Down
91 changes: 91 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,73 @@ func TestParsePulsarMetadata(t *testing.T) {
assert.Equal(t, uint(200), meta.BatchingMaxMessages)
assert.Equal(t, uint(333), meta.MaxConcurrentHandlers)
assert.Empty(t, meta.internalTopicSchemas)
assert.Equal(t, "shared", meta.SubscriptionType)
}

func TestParsePulsarMetadataSubscriptionType(t *testing.T) {
tt := []struct {
name string
subscribeType string
expected string
err bool
}{
{
name: "test valid subscribe type - key_shared",
subscribeType: "key_shared",
expected: "key_shared",
err: false,
},
{
name: "test valid subscribe type - shared",
subscribeType: "shared",
expected: "shared",
err: false,
},
{
name: "test valid subscribe type - failover",
subscribeType: "failover",
expected: "failover",
err: false,
},
{
name: "test valid subscribe type - exclusive",
subscribeType: "exclusive",
expected: "exclusive",
err: false,
},
{
name: "test valid subscribe type - empty",
subscribeType: "",
expected: "shared",
err: false,
},
{
name: "test invalid subscribe type",
subscribeType: "invalid",
err: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}

m.Properties = map[string]string{
"host": "a",
"subscribeType": tc.subscribeType,
}
meta, err := parsePulsarMetadata(m)

if tc.err {
require.Error(t, err)
assert.Nil(t, meta)
return
}

require.NoError(t, err)
assert.Equal(t, tc.expected, meta.SubscriptionType)
})
}
}

func TestParsePulsarSchemaMetadata(t *testing.T) {
Expand Down Expand Up @@ -328,3 +395,27 @@ func TestEncryptionKeys(t *testing.T) {
assert.False(t, r)
})
}

func TestSanitiseURL(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{"With pulsar+ssl prefix", "pulsar+ssl://localhost:6650", "pulsar+ssl://localhost:6650"},
{"With pulsar prefix", "pulsar://localhost:6650", "pulsar://localhost:6650"},
{"With http prefix", "http://localhost:6650", "http://localhost:6650"},
{"With https prefix", "https://localhost:6650", "https://localhost:6650"},
{"Without prefix", "localhost:6650", "pulsar://localhost:6650"},
{"Empty string", "", "pulsar://"},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := sanitiseURL(test.input)
if actual != test.expected {
t.Errorf("sanitiseURL(%q) = %q; want %q", test.input, actual, test.expected)
}
})
}
}

0 comments on commit 2aea319

Please sign in to comment.