From 1148776d370d0f8a311c3b71783baae0051134b8 Mon Sep 17 00:00:00 2001 From: Christian Leinweber Date: Mon, 8 Aug 2022 18:53:04 +0200 Subject: [PATCH] binding.eventhub: use same name schema for storage leases like on pubsub.eventhub (#1940) * binding.eventhub: use same name schema for storage leases like on pubsub.eventhub Signed-off-by: Christian Leinweber * binding.eventhub: rename AzureEventshub local var Signed-off-by: Christian Leinweber --- bindings/azure/eventhubs/eventhubs.go | 31 ++++++++++++++++++- bindings/azure/eventhubs/eventhubs_test.go | 35 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/bindings/azure/eventhubs/eventhubs.go b/bindings/azure/eventhubs/eventhubs.go index e6416dc7e0..55914d1f54 100644 --- a/bindings/azure/eventhubs/eventhubs.go +++ b/bindings/azure/eventhubs/eventhubs.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "strconv" + "strings" "time" "github.com/Azure/azure-amqp-common-go/v3/aad" @@ -154,6 +155,28 @@ func validate(connectionString string) error { return err } +func (a *AzureEventHubs) getStoragePrefixString() (string, error) { + hubName, err := a.validateAndGetHubName() + if err != nil { + return "", err + } + + // empty string in the end of slice to have a suffix "-". + return strings.Join([]string{"dapr", hubName, a.metadata.consumerGroup, ""}, "-"), nil +} + +func (a *AzureEventHubs) validateAndGetHubName() (string, error) { + hubName := a.metadata.eventHubName + if hubName == "" { + parsed, err := conn.ParsedConnectionFromStr(a.metadata.connectionString) + if err != nil { + return "", err + } + hubName = parsed.HubName + } + return hubName, nil +} + // Init performs metadata init. func (a *AzureEventHubs) Init(metadata bindings.Metadata) error { m, err := parseMetadata(metadata) @@ -360,7 +383,13 @@ func (a *AzureEventHubs) RegisterPartitionedEventProcessor(ctx context.Context, // RegisterEventProcessor - receive eventhub messages by eventprocessor // host by balancing partitions. func (a *AzureEventHubs) RegisterEventProcessor(ctx context.Context, handler bindings.Handler) error { - leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment) + storagePrefix, err := a.getStoragePrefixString() + if err != nil { + return err + } + + leaserPrefixOpt := storage.WithPrefixInBlobPath(storagePrefix) + leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment, leaserPrefixOpt) if err != nil { return err } diff --git a/bindings/azure/eventhubs/eventhubs_test.go b/bindings/azure/eventhubs/eventhubs_test.go index e8c4d5364c..086dcfaea8 100644 --- a/bindings/azure/eventhubs/eventhubs_test.go +++ b/bindings/azure/eventhubs/eventhubs_test.go @@ -17,10 +17,45 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/dapr/components-contrib/bindings" + "github.com/dapr/kit/logger" ) +var testLogger = logger.NewLogger("test") + +func TestGetStoragePrefixString(t *testing.T) { + props := map[string]string{"storageAccountName": "fake", "storageAccountKey": "fake", "consumerGroup": "default", "storageContainerName": "test", "eventHub": "hubName", "eventHubNamespace": "fake"} + + metadata := bindings.Metadata{Properties: props} + m, err := parseMetadata(metadata) + + require.NoError(t, err) + + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + + actual, _ := aeh.getStoragePrefixString() + + assert.Equal(t, "dapr-hubName-default-", actual) +} + +func TestGetStoragePrefixStringWithHubNameFromConnectionString(t *testing.T) { + connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=hubName" + props := map[string]string{"storageAccountName": "fake", "storageAccountKey": "fake", "consumerGroup": "default", "storageContainerName": "test", "connectionString": connectionString} + + metadata := bindings.Metadata{Properties: props} + m, err := parseMetadata(metadata) + + require.NoError(t, err) + + aeh := &AzureEventHubs{logger: testLogger, metadata: m} + + actual, _ := aeh.getStoragePrefixString() + + assert.Equal(t, "dapr-hubName-default-", actual) +} + func TestParseMetadata(t *testing.T) { t.Run("test valid configuration", func(t *testing.T) { props := map[string]string{connectionString: "fake", consumerGroup: "mygroup", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"}