diff --git a/pubsub/pulsar/metadata.go b/pubsub/pulsar/metadata.go index 62b3b06bbc..ba5067796b 100644 --- a/pubsub/pulsar/metadata.go +++ b/pubsub/pulsar/metadata.go @@ -20,25 +20,25 @@ import ( ) type pulsarMetadata struct { - Host string `mapstructure:"host"` - ConsumerID string `mapstructure:"consumerID"` - EnableTLS bool `mapstructure:"enableTLS"` - DisableBatching bool `mapstructure:"disableBatching"` - BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"` - BatchingMaxSize uint `mapstructure:"batchingMaxSize"` - BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"` - Tenant string `mapstructure:"tenant"` - Namespace string `mapstructure:"namespace"` - Persistent bool `mapstructure:"persistent"` - RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"` - internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"` - PublicKey string `mapstructure:"publicKey"` - PrivateKey string `mapstructure:"privateKey"` - Keys string `mapstructure:"keys"` - MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"` - ReceiverQueueSize int `mapstructure:"receiverQueueSize"` - - Token string `mapstructure:"token"` + Host string `mapstructure:"host"` + ConsumerID string `mapstructure:"consumerID"` + EnableTLS bool `mapstructure:"enableTLS"` + DisableBatching bool `mapstructure:"disableBatching"` + BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"` + BatchingMaxSize uint `mapstructure:"batchingMaxSize"` + BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"` + Tenant string `mapstructure:"tenant"` + Namespace string `mapstructure:"namespace"` + Persistent bool `mapstructure:"persistent"` + RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"` + internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"` + PublicKey string `mapstructure:"publicKey"` + PrivateKey string `mapstructure:"privateKey"` + Keys string `mapstructure:"keys"` + MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"` + ReceiverQueueSize int `mapstructure:"receiverQueueSize"` + SubscriptionType string `mapstructure:"subscribeType"` + Token string `mapstructure:"token"` oauth2.ClientCredentialsMetadata `mapstructure:",squash"` } diff --git a/pubsub/pulsar/metadata.yaml b/pubsub/pulsar/metadata.yaml index 7cc216cf12..63421fe3c1 100644 --- a/pubsub/pulsar/metadata.yaml +++ b/pubsub/pulsar/metadata.yaml @@ -183,4 +183,13 @@ metadata: Sets the size of the consumer receive queue. Controls how many messages can be accumulated by the consumer before it is explicitly called to read messages by Dapr. default: '"1000"' - example: '"1000"' \ No newline at end of file + example: '"1000"' + - name: subscribeType + type: string + description: | + Pulsar supports four subscription types:"shared", "exclusive", "failover", "key_shared". + default: '"shared"' + example: '"exclusive"' + url: + title: "Pulsar Subscription Types" + url: "https://pulsar.apache.org/docs/3.0.x/concepts-messaging/#subscription-types" \ No newline at end of file diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index 7822d63f5e..39074509ce 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -138,6 +138,12 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) { return nil, errors.New("pulsar error: missing pulsar host") } + var err error + m.SubscriptionType, err = parseSubscriptionType(meta.Properties[subscribeTypeKey]) + if err != nil { + return nil, errors.New("invalid subscription type. Accepted values are `exclusive`, `shared`, `failover` and `key_shared`") + } + for k, v := range meta.Properties { switch { case strings.HasSuffix(k, topicJSONSchemaIdentifier): @@ -170,10 +176,8 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error { return err } pulsarURL := m.Host - if !strings.HasPrefix(m.Host, "http://") && - !strings.HasPrefix(m.Host, "https://") { - pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, m.Host) - } + + pulsarURL = sanitiseURL(pulsarURL) options := pulsar.ClientOptions{ URL: pulsarURL, OperationTimeout: 30 * time.Second, @@ -226,6 +230,23 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error { return nil } +func sanitiseURL(pulsarURL string) string { + prefixes := []string{"pulsar+ssl://", "pulsar://", "http://", "https://"} + + hasPrefix := false + for _, prefix := range prefixes { + if strings.HasPrefix(pulsarURL, prefix) { + hasPrefix = true + break + } + } + + if !hasPrefix { + pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, pulsarURL) + } + return pulsarURL +} + func (p *Pulsar) useProducerEncryption() bool { return p.metadata.PublicKey != "" && p.metadata.Keys != "" } @@ -370,11 +391,22 @@ func parsePublishMetadata(req *pubsub.PublishRequest, schema schemaMetadata) ( return msg, nil } -// default: shared -func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType { +func parseSubscriptionType(in string) (string, error) { + subsType := strings.ToLower(in) + switch subsType { + case subscribeTypeExclusive, subscribeTypeFailover, subscribeTypeShared, subscribeTypeKeyShared: + return subsType, nil + case "": + return subscribeTypeShared, nil + default: + return "", fmt.Errorf("invalid subscription type: %s", subsType) + } +} + +// getSubscribeType doesn't do extra validations, because they were done in parseSubscriptionType. +func getSubscribeType(subsTypeStr string) pulsar.SubscriptionType { var subsType pulsar.SubscriptionType - subsTypeStr := strings.ToLower(metadata[subscribeTypeKey]) switch subsTypeStr { case subscribeTypeExclusive: subsType = pulsar.Exclusive @@ -384,8 +416,6 @@ func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType { subsType = pulsar.Shared case subscribeTypeKeyShared: subsType = pulsar.KeyShared - default: - subsType = pulsar.Shared } return subsType @@ -400,15 +430,27 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han topic := p.formatTopic(req.Topic) + subscribeType := p.metadata.SubscriptionType + if s, exists := req.Metadata[subscribeTypeKey]; exists { + subscribeType = s + } + options := pulsar.ConsumerOptions{ Topic: topic, SubscriptionName: p.metadata.ConsumerID, - Type: getSubscribeType(req.Metadata), + Type: getSubscribeType(subscribeType), MessageChannel: channel, NackRedeliveryDelay: p.metadata.RedeliveryDelay, ReceiverQueueSize: p.metadata.ReceiverQueueSize, } + // Handle KeySharedPolicy for key_shared subscription type + if options.Type == pulsar.KeyShared { + options.KeySharedPolicy = &pulsar.KeySharedPolicy{ + Mode: pulsar.KeySharedPolicyModeAutoSplit, + } + } + if p.useConsumerEncryption() { var reader crypto.KeyReader if isValidPEM(p.metadata.PublicKey) { @@ -430,6 +472,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han p.logger.Debugf("Could not subscribe to %s, full topic name in pulsar is %s", req.Topic, topic) return err } + p.logger.Debugf("Subscribed to '%s'(%s) with type '%s'", req.Topic, topic, subscribeType) p.wg.Add(2) listenCtx, cancel := context.WithCancel(ctx) diff --git a/pubsub/pulsar/pulsar_test.go b/pubsub/pulsar/pulsar_test.go index 82b8157cd1..da1c62247e 100644 --- a/pubsub/pulsar/pulsar_test.go +++ b/pubsub/pulsar/pulsar_test.go @@ -48,6 +48,73 @@ func TestParsePulsarMetadata(t *testing.T) { assert.Equal(t, uint(200), meta.BatchingMaxMessages) assert.Equal(t, uint(333), meta.MaxConcurrentHandlers) assert.Empty(t, meta.internalTopicSchemas) + assert.Equal(t, "shared", meta.SubscriptionType) +} + +func TestParsePulsarMetadataSubscriptionType(t *testing.T) { + tt := []struct { + name string + subscribeType string + expected string + err bool + }{ + { + name: "test valid subscribe type - key_shared", + subscribeType: "key_shared", + expected: "key_shared", + err: false, + }, + { + name: "test valid subscribe type - shared", + subscribeType: "shared", + expected: "shared", + err: false, + }, + { + name: "test valid subscribe type - failover", + subscribeType: "failover", + expected: "failover", + err: false, + }, + { + name: "test valid subscribe type - exclusive", + subscribeType: "exclusive", + expected: "exclusive", + err: false, + }, + { + name: "test valid subscribe type - empty", + subscribeType: "", + expected: "shared", + err: false, + }, + { + name: "test invalid subscribe type", + subscribeType: "invalid", + err: true, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + m := pubsub.Metadata{} + + m.Properties = map[string]string{ + "host": "a", + "subscribeType": tc.subscribeType, + } + meta, err := parsePulsarMetadata(m) + + if tc.err { + require.Error(t, err) + assert.Nil(t, meta) + return + } + + require.NoError(t, err) + assert.Equal(t, tc.expected, meta.SubscriptionType) + }) + } } func TestParsePulsarSchemaMetadata(t *testing.T) { @@ -328,3 +395,27 @@ func TestEncryptionKeys(t *testing.T) { assert.False(t, r) }) } + +func TestSanitiseURL(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + {"With pulsar+ssl prefix", "pulsar+ssl://localhost:6650", "pulsar+ssl://localhost:6650"}, + {"With pulsar prefix", "pulsar://localhost:6650", "pulsar://localhost:6650"}, + {"With http prefix", "http://localhost:6650", "http://localhost:6650"}, + {"With https prefix", "https://localhost:6650", "https://localhost:6650"}, + {"Without prefix", "localhost:6650", "pulsar://localhost:6650"}, + {"Empty string", "", "pulsar://"}, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := sanitiseURL(test.input) + if actual != test.expected { + t.Errorf("sanitiseURL(%q) = %q; want %q", test.input, actual, test.expected) + } + }) + } +}