Skip to content

Commit

Permalink
Fixes subscription type not being parsed and not being able to connec…
Browse files Browse the repository at this point in the history
…t to remote endpoints starting with `pulsar://` or `pulsar+ssl://`

Signed-off-by: Elena Kolevska <[email protected]>
  • Loading branch information
elena-kolevska committed Nov 15, 2024
1 parent a00a853 commit 500ea3e
Show file tree
Hide file tree
Showing 3 changed files with 157 additions and 29 deletions.
38 changes: 19 additions & 19 deletions pubsub/pulsar/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,25 @@ import (
)

type pulsarMetadata struct {
Host string `mapstructure:"host"`
ConsumerID string `mapstructure:"consumerID"`
EnableTLS bool `mapstructure:"enableTLS"`
DisableBatching bool `mapstructure:"disableBatching"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"`
BatchingMaxSize uint `mapstructure:"batchingMaxSize"`
BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"`
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`

Token string `mapstructure:"token"`
Host string `mapstructure:"host"`
ConsumerID string `mapstructure:"consumerID"`
EnableTLS bool `mapstructure:"enableTLS"`
DisableBatching bool `mapstructure:"disableBatching"`
BatchingMaxPublishDelay time.Duration `mapstructure:"batchingMaxPublishDelay"`
BatchingMaxSize uint `mapstructure:"batchingMaxSize"`
BatchingMaxMessages uint `mapstructure:"batchingMaxMessages"`
Tenant string `mapstructure:"tenant"`
Namespace string `mapstructure:"namespace"`
Persistent bool `mapstructure:"persistent"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"`
ReceiverQueueSize int `mapstructure:"receiverQueueSize"`
SubscriptionType string `mapstructure:"subscribeType"`
Token string `mapstructure:"token"`
oauth2.ClientCredentialsMetadata `mapstructure:",squash"`
}

Expand Down
57 changes: 47 additions & 10 deletions pubsub/pulsar/pulsar.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) {
return nil, errors.New("pulsar error: missing pulsar host")
}

var err error
m.SubscriptionType, err = parseSubscriptionType(meta.Properties[subscribeTypeKey])
if err != nil {
return nil, errors.New("invalid subscription type. Accepted values are `exclusive`, `shared`, `failover` and `key_shared`")
}

for k, v := range meta.Properties {
switch {
case strings.HasSuffix(k, topicJSONSchemaIdentifier):
Expand Down Expand Up @@ -170,10 +176,8 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
return err
}
pulsarURL := m.Host
if !strings.HasPrefix(m.Host, "http://") &&
!strings.HasPrefix(m.Host, "https://") {
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, m.Host)
}

pulsarURL = sanitiseURL(pulsarURL)
options := pulsar.ClientOptions{
URL: pulsarURL,
OperationTimeout: 30 * time.Second,
Expand Down Expand Up @@ -226,6 +230,23 @@ func (p *Pulsar) Init(ctx context.Context, metadata pubsub.Metadata) error {
return nil
}

func sanitiseURL(pulsarURL string) string {
prefixes := []string{"pulsar+ssl://", "pulsar://", "http://", "https://"}

hasPrefix := false
for _, prefix := range prefixes {
if strings.HasPrefix(pulsarURL, prefix) {
hasPrefix = true
break
}
}

if !hasPrefix {
pulsarURL = fmt.Sprintf("%s%s", pulsarPrefix, pulsarURL)
}
return pulsarURL
}

func (p *Pulsar) useProducerEncryption() bool {
return p.metadata.PublicKey != "" && p.metadata.Keys != ""
}
Expand Down Expand Up @@ -370,11 +391,22 @@ func parsePublishMetadata(req *pubsub.PublishRequest, schema schemaMetadata) (
return msg, nil
}

// default: shared
func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType {
func parseSubscriptionType(in string) (string, error) {
subsType := strings.ToLower(in)
switch subsType {
case subscribeTypeExclusive, subscribeTypeFailover, subscribeTypeShared, subscribeTypeKeyShared:
return subsType, nil
case "":
return subscribeTypeShared, nil
default:
return "", fmt.Errorf("invalid subscription type: %s", subsType)
}
}

// getSubscribeType doesn't do extra validations, because they were done in parseSubscriptionType.
func getSubscribeType(subsTypeStr string) pulsar.SubscriptionType {
var subsType pulsar.SubscriptionType

subsTypeStr := strings.ToLower(metadata[subscribeTypeKey])
switch subsTypeStr {
case subscribeTypeExclusive:
subsType = pulsar.Exclusive
Expand All @@ -384,8 +416,6 @@ func getSubscribeType(metadata map[string]string) pulsar.SubscriptionType {
subsType = pulsar.Shared
case subscribeTypeKeyShared:
subsType = pulsar.KeyShared
default:
subsType = pulsar.Shared
}

return subsType
Expand All @@ -403,12 +433,19 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
options := pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: p.metadata.ConsumerID,
Type: getSubscribeType(req.Metadata),
Type: getSubscribeType(p.metadata.SubscriptionType),
MessageChannel: channel,
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
ReceiverQueueSize: p.metadata.ReceiverQueueSize,
}

// Handle KeySharedPolicy for key_shared subscription type
if options.Type == pulsar.KeyShared {
options.KeySharedPolicy = &pulsar.KeySharedPolicy{
Mode: pulsar.KeySharedPolicyModeAutoSplit,
}
}

if p.useConsumerEncryption() {
var reader crypto.KeyReader
if isValidPEM(p.metadata.PublicKey) {
Expand Down
91 changes: 91 additions & 0 deletions pubsub/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,73 @@ func TestParsePulsarMetadata(t *testing.T) {
assert.Equal(t, uint(200), meta.BatchingMaxMessages)
assert.Equal(t, uint(333), meta.MaxConcurrentHandlers)
assert.Empty(t, meta.internalTopicSchemas)
assert.Equal(t, "shared", meta.SubscriptionType)
}

func TestParsePulsarMetadataSubscriptionType(t *testing.T) {
tt := []struct {
name string
subscribeType string
expected string
err bool
}{
{
name: "test valid subscribe type - key_shared",
subscribeType: "key_shared",
expected: "key_shared",
err: false,
},
{
name: "test valid subscribe type - shared",
subscribeType: "shared",
expected: "shared",
err: false,
},
{
name: "test valid subscribe type - failover",
subscribeType: "failover",
expected: "failover",
err: false,
},
{
name: "test valid subscribe type - exclusive",
subscribeType: "exclusive",
expected: "exclusive",
err: false,
},
{
name: "test valid subscribe type - empty",
subscribeType: "",
expected: "shared",
err: false,
},
{
name: "test invalid subscribe type",
subscribeType: "invalid",
err: true,
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
m := pubsub.Metadata{}

m.Properties = map[string]string{
"host": "a",
"subscribeType": tc.subscribeType,
}
meta, err := parsePulsarMetadata(m)

if tc.err {
require.Error(t, err)
assert.Nil(t, meta)
return
}

require.NoError(t, err)
assert.Equal(t, tc.expected, meta.SubscriptionType)
})
}
}

func TestParsePulsarSchemaMetadata(t *testing.T) {
Expand Down Expand Up @@ -328,3 +395,27 @@ func TestEncryptionKeys(t *testing.T) {
assert.False(t, r)
})
}

func TestSanitiseURL(t *testing.T) {
tests := []struct {
name string
input string
expected string
}{
{"With pulsar+ssl prefix", "pulsar+ssl://localhost:6650", "pulsar+ssl://localhost:6650"},
{"With pulsar prefix", "pulsar://localhost:6650", "pulsar://localhost:6650"},
{"With http prefix", "http://localhost:6650", "http://localhost:6650"},
{"With https prefix", "https://localhost:6650", "https://localhost:6650"},
{"Without prefix", "localhost:6650", "pulsar://localhost:6650"},
{"Empty string", "", "pulsar://"},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := sanitiseURL(test.input)
if actual != test.expected {
t.Errorf("sanitiseURL(%q) = %q; want %q", test.input, actual, test.expected)
}
})
}
}

0 comments on commit 500ea3e

Please sign in to comment.