From 6b587954fa379bf2df6042c04a2e43065a23f5b9 Mon Sep 17 00:00:00 2001 From: cmendible Date: Mon, 4 Jul 2022 20:38:36 +0000 Subject: [PATCH] Fixing #1845 Signed-off-by: cmendible --- bindings/azure/servicebusqueues/metadata.go | 104 ++++++++++++------ .../azure/servicebusqueues/metadata_test.go | 82 ++++++++++---- 2 files changed, 135 insertions(+), 51 deletions(-) diff --git a/bindings/azure/servicebusqueues/metadata.go b/bindings/azure/servicebusqueues/metadata.go index 8fb8db8ebe..2704d0d355 100644 --- a/bindings/azure/servicebusqueues/metadata.go +++ b/bindings/azure/servicebusqueues/metadata.go @@ -1,8 +1,9 @@ package servicebusqueues import ( - "encoding/json" "errors" + "fmt" + "strconv" "strings" "time" @@ -27,6 +28,18 @@ type serviceBusQueuesMetadata struct { } const ( + // Keys + connectionString = "connectionString" + namespaceName = "namespaceName" + queueName = "queueName" + timeoutInSec = "timeoutInSec" + maxConnectionRecoveryInSec = "maxConnectionRecoveryInSec" + minConnectionRecoveryInSec = "minConnectionRecoveryInSec" + maxRetriableErrorsPerSec = "maxRetriableErrorsPerSec" + maxActiveMessages = "maxActiveMessages" + lockRenewalInSec = "lockRenewalInSec" + maxConcurrentHandlers = "maxConcurrentHandlers" + // Default time to live for queues, which is 14 days. The same way Azure Portal does. defaultMessageTimeToLive = time.Hour * 24 * 14 @@ -50,18 +63,19 @@ const ( // Default rate of retriable errors per second defaultMaxRetriableErrorsPerSec = 10 + + errorMessagePrefix = "azure service bus error:" ) func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serviceBusQueuesMetadata, error) { - b, err := json.Marshal(metadata.Properties) - if err != nil { - return nil, err + m := serviceBusQueuesMetadata{} + + if val, ok := metadata.Properties[connectionString]; ok && val != "" { + m.ConnectionString = val } - var m serviceBusQueuesMetadata - err = json.Unmarshal(b, &m) - if err != nil { - return nil, err + if val, ok := metadata.Properties[namespaceName]; ok && val != "" { + m.NamespaceName = val } if m.ConnectionString != "" && m.NamespaceName != "" { @@ -78,46 +92,74 @@ func (a *AzureServiceBusQueues) parseMetadata(metadata bindings.Metadata) (*serv } m.ttl = ttl - // Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior. - m.QueueName = strings.ToLower(m.QueueName) - - if m.TimeoutInSec < 1 { - m.TimeoutInSec = defaultTimeoutInSec + if val, ok := metadata.Properties[queueName]; ok && val != "" { + // Queue names are case-insensitive and are forced to lowercase. This mimics the Azure portal's behavior. + m.QueueName = strings.ToLower(val) } - if m.MinConnectionRecoveryInSec < 1 { - m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec + /* Optional configuration settings - defaults will be set by the client. */ + m.TimeoutInSec = defaultTimeoutInSec + if val, ok := metadata.Properties[timeoutInSec]; ok && val != "" { + m.TimeoutInSec, err = strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid timeoutInSec %s, %s", errorMessagePrefix, val, err) + } } - if m.MaxConnectionRecoveryInSec < 1 { - m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec + m.MinConnectionRecoveryInSec = defaultMinConnectionRecoveryInSec + if val, ok := metadata.Properties[minConnectionRecoveryInSec]; ok && val != "" { + m.MinConnectionRecoveryInSec, err = strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid minConnectionRecoveryInSec %s, %s", errorMessagePrefix, val, err) + } } - if m.MinConnectionRecoveryInSec > m.MaxConnectionRecoveryInSec { - return nil, errors.New("maxConnectionRecoveryInSec must be greater than minConnectionRecoveryInSec") + m.MaxConnectionRecoveryInSec = defaultMaxConnectionRecoveryInSec + if val, ok := metadata.Properties[maxConnectionRecoveryInSec]; ok && val != "" { + m.MaxConnectionRecoveryInSec, err = strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid maxConnectionRecoveryInSec %s, %s", errorMessagePrefix, val, err) + } } - if m.MaxActiveMessages < 1 { - m.MaxActiveMessages = defaultMaxActiveMessages + m.MaxActiveMessages = defaultMaxActiveMessages + if val, ok := metadata.Properties[maxActiveMessages]; ok && val != "" { + m.MaxActiveMessages, err = strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid maxActiveMessages %s, %s", errorMessagePrefix, val, err) + } } - if m.MaxConcurrentHandlers < 1 { - m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers + m.MaxConcurrentHandlers = defaultMaxConcurrentHandlers + if val, ok := metadata.Properties[maxConcurrentHandlers]; ok && val != "" { + m.MaxConcurrentHandlers, err = strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid maxConcurrentHandlers %s, %s", errorMessagePrefix, val, err) + } } if m.MaxConcurrentHandlers > m.MaxActiveMessages { - return nil, errors.New("maxConcurrentHandlers cannot be bigger than maxActiveMessages") + return nil, fmt.Errorf("%s maxConcurrentHandlers cannot be bigger than maxActiveMessages, %s", errorMessagePrefix, err) } - if m.LockRenewalInSec < 1 { - m.LockRenewalInSec = defaultLockRenewalInSec + m.LockRenewalInSec = defaultLockRenewalInSec + if val, ok := metadata.Properties[lockRenewalInSec]; ok && val != "" { + m.LockRenewalInSec, err = strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid lockRenewalInSec %s, %s", errorMessagePrefix, val, err) + } } - if m.MaxRetriableErrorsPerSec == nil { - m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec) - } - if *m.MaxRetriableErrorsPerSec < 0 { - return nil, errors.New("maxRetriableErrorsPerSec must be non-negative") + m.MaxRetriableErrorsPerSec = to.Ptr(defaultMaxRetriableErrorsPerSec) + if val, ok := metadata.Properties[maxRetriableErrorsPerSec]; ok && val != "" { + mRetriableErrorsPerSec, err := strconv.Atoi(val) + if err != nil { + return &m, fmt.Errorf("%s invalid lockRenewalInSec %s, %s", errorMessagePrefix, val, err) + } + if mRetriableErrorsPerSec < 0 { + return nil, fmt.Errorf("%smaxRetriableErrorsPerSec must be non-negative, %s", errorMessagePrefix, err) + } + m.MaxRetriableErrorsPerSec = to.Ptr(mRetriableErrorsPerSec) } return &m, nil diff --git a/bindings/azure/servicebusqueues/metadata_test.go b/bindings/azure/servicebusqueues/metadata_test.go index dcdaea384e..268b186ff2 100644 --- a/bindings/azure/servicebusqueues/metadata_test.go +++ b/bindings/azure/servicebusqueues/metadata_test.go @@ -28,32 +28,74 @@ func TestParseMetadata(t *testing.T) { oneSecondDuration := time.Second testCases := []struct { - name string - properties map[string]string - expectedConnectionString string - expectedQueueName string - expectedTTL time.Duration + name string + properties map[string]string + expectedConnectionString string + expectedQueueName string + expectedTTL time.Duration + expectedTimeoutInSec int + expectedMaxConnectionRecoveryInSec int + expectedMinConnectionRecoveryInSec int + expectedMaxRetriableErrorsPerSec int + expectedMaxActiveMessages int + expectedLockRenewalInSec int + expectedMaxConcurrentHandlers int }{ { - name: "ConnectionString and queue name", - properties: map[string]string{"connectionString": "connString", "queueName": "queue1"}, - expectedConnectionString: "connString", - expectedQueueName: "queue1", - expectedTTL: defaultMessageTimeToLive, + name: "ConnectionString and queue name", + properties: map[string]string{"connectionString": "connString", "queueName": "queue1"}, + expectedConnectionString: "connString", + expectedQueueName: "queue1", + expectedTTL: defaultMessageTimeToLive, + expectedTimeoutInSec: defaultTimeoutInSec, + expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec, + expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec, + expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec, + expectedMaxActiveMessages: defaultMaxActiveMessages, + expectedLockRenewalInSec: defaultLockRenewalInSec, + expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers, }, { - name: "Empty TTL", - properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""}, - expectedConnectionString: "connString", - expectedQueueName: "queue1", - expectedTTL: defaultMessageTimeToLive, + name: "ConnectionString, queue name and all optional values", + properties: map[string]string{"connectionString": "connString", "queueName": "queue1", "timeoutInSec": "30", "minConnectionRecoveryInSec": "1", "maxConnectionRecoveryInSec": "200", "maxRetriableErrorsPerSec": "20", "maxActiveMessages": "10", "maxConcurrentHandlers": "2", "lockRenewalInSec": "30"}, + expectedConnectionString: "connString", + expectedQueueName: "queue1", + expectedTTL: defaultMessageTimeToLive, + expectedTimeoutInSec: 30, + expectedMaxConnectionRecoveryInSec: 200, + expectedMinConnectionRecoveryInSec: 1, + expectedMaxRetriableErrorsPerSec: 20, + expectedMaxActiveMessages: 10, + expectedMaxConcurrentHandlers: 2, + expectedLockRenewalInSec: 30, }, { - name: "With TTL", - properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: "1"}, - expectedConnectionString: "connString", - expectedQueueName: "queue1", - expectedTTL: oneSecondDuration, + name: "Empty TTL", + properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: ""}, + expectedConnectionString: "connString", + expectedQueueName: "queue1", + expectedTTL: defaultMessageTimeToLive, + expectedTimeoutInSec: defaultTimeoutInSec, + expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec, + expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec, + expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec, + expectedMaxActiveMessages: defaultMaxActiveMessages, + expectedLockRenewalInSec: defaultLockRenewalInSec, + expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers, + }, + { + name: "With TTL", + properties: map[string]string{"connectionString": "connString", "queueName": "queue1", metadata.TTLMetadataKey: "1"}, + expectedConnectionString: "connString", + expectedQueueName: "queue1", + expectedTTL: oneSecondDuration, + expectedTimeoutInSec: defaultTimeoutInSec, + expectedMaxConnectionRecoveryInSec: defaultMaxConnectionRecoveryInSec, + expectedMinConnectionRecoveryInSec: defaultMinConnectionRecoveryInSec, + expectedMaxRetriableErrorsPerSec: defaultMaxRetriableErrorsPerSec, + expectedMaxActiveMessages: defaultMaxActiveMessages, + expectedLockRenewalInSec: defaultLockRenewalInSec, + expectedMaxConcurrentHandlers: defaultMaxConcurrentHandlers, }, }