Skip to content

Commit

Permalink
binding.eventhub: use same name schema for storage leases like on pub…
Browse files Browse the repository at this point in the history
…sub.eventhub (dapr#1940)

* binding.eventhub: use same name schema for storage leases like on pubsub.eventhub

Signed-off-by: Christian Leinweber <[email protected]>

* binding.eventhub: rename AzureEventshub local var

Signed-off-by: Christian Leinweber <[email protected]>
Signed-off-by: Andrew Duss <[email protected]>
  • Loading branch information
christle authored and Andrew Duss committed Aug 18, 2022
1 parent ae6708f commit 805d9e4
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 1 deletion.
31 changes: 30 additions & 1 deletion bindings/azure/eventhubs/eventhubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"time"

"github.com/Azure/azure-amqp-common-go/v3/aad"
Expand Down Expand Up @@ -154,6 +155,28 @@ func validate(connectionString string) error {
return err
}

func (a *AzureEventHubs) getStoragePrefixString() (string, error) {
hubName, err := a.validateAndGetHubName()
if err != nil {
return "", err
}

// empty string in the end of slice to have a suffix "-".
return strings.Join([]string{"dapr", hubName, a.metadata.consumerGroup, ""}, "-"), nil
}

func (a *AzureEventHubs) validateAndGetHubName() (string, error) {
hubName := a.metadata.eventHubName
if hubName == "" {
parsed, err := conn.ParsedConnectionFromStr(a.metadata.connectionString)
if err != nil {
return "", err
}
hubName = parsed.HubName
}
return hubName, nil
}

// Init performs metadata init.
func (a *AzureEventHubs) Init(metadata bindings.Metadata) error {
m, err := parseMetadata(metadata)
Expand Down Expand Up @@ -360,7 +383,13 @@ func (a *AzureEventHubs) RegisterPartitionedEventProcessor(ctx context.Context,
// RegisterEventProcessor - receive eventhub messages by eventprocessor
// host by balancing partitions.
func (a *AzureEventHubs) RegisterEventProcessor(ctx context.Context, handler bindings.Handler) error {
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment)
storagePrefix, err := a.getStoragePrefixString()
if err != nil {
return err
}

leaserPrefixOpt := storage.WithPrefixInBlobPath(storagePrefix)
leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(a.storageCredential, a.metadata.storageAccountName, a.metadata.storageContainerName, *a.azureEnvironment, leaserPrefixOpt)
if err != nil {
return err
}
Expand Down
35 changes: 35 additions & 0 deletions bindings/azure/eventhubs/eventhubs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,45 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)

var testLogger = logger.NewLogger("test")

func TestGetStoragePrefixString(t *testing.T) {
props := map[string]string{"storageAccountName": "fake", "storageAccountKey": "fake", "consumerGroup": "default", "storageContainerName": "test", "eventHub": "hubName", "eventHubNamespace": "fake"}

metadata := bindings.Metadata{Properties: props}
m, err := parseMetadata(metadata)

require.NoError(t, err)

aeh := &AzureEventHubs{logger: testLogger, metadata: m}

actual, _ := aeh.getStoragePrefixString()

assert.Equal(t, "dapr-hubName-default-", actual)
}

func TestGetStoragePrefixStringWithHubNameFromConnectionString(t *testing.T) {
connectionString := "Endpoint=sb://fake.servicebus.windows.net/;SharedAccessKeyName=fakeKey;SharedAccessKey=key;EntityPath=hubName"
props := map[string]string{"storageAccountName": "fake", "storageAccountKey": "fake", "consumerGroup": "default", "storageContainerName": "test", "connectionString": connectionString}

metadata := bindings.Metadata{Properties: props}
m, err := parseMetadata(metadata)

require.NoError(t, err)

aeh := &AzureEventHubs{logger: testLogger, metadata: m}

actual, _ := aeh.getStoragePrefixString()

assert.Equal(t, "dapr-hubName-default-", actual)
}

func TestParseMetadata(t *testing.T) {
t.Run("test valid configuration", func(t *testing.T) {
props := map[string]string{connectionString: "fake", consumerGroup: "mygroup", storageAccountName: "account", storageAccountKey: "key", storageContainerName: "container"}
Expand Down

0 comments on commit 805d9e4

Please sign in to comment.