From 37b326207a074f4d3a27dc6f61a63880cd022c52 Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Tue, 29 Nov 2022 15:09:03 +0530 Subject: [PATCH 1/4] Fix path variable for WSL Signed-off-by: Vighnesh Shenoy --- Makefile | 4 ++-- pkg/scalers/liiklus/LiiklusService.pb.go | 22 +++++++++---------- pkg/scalers/liiklus/LiiklusService_grpc.pb.go | 12 +++++----- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index a27af09f023..c807100c13e 100644 --- a/Makefile +++ b/Makefile @@ -149,8 +149,8 @@ clientset-generate: ## Generate client-go clientset, listers and informers. ./hack/update-codegen.sh proto-gen: protoc-gen ## Generate Liiklus, ExternalScaler and MetricsService proto - PATH=$(LOCALBIN):$(PATH) protoc -I vendor --proto_path=hack LiiklusService.proto --go_out=pkg/scalers/liiklus --go-grpc_out=pkg/scalers/liiklus - PATH=$(LOCALBIN):$(PATH) protoc -I vendor --proto_path=pkg/scalers/externalscaler externalscaler.proto --go_out=pkg/scalers/externalscaler --go-grpc_out=pkg/scalers/externalscaler + PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=hack LiiklusService.proto --go_out=pkg/scalers/liiklus --go-grpc_out=pkg/scalers/liiklus + PATH="$(LOCALBIN):$(PATH)" protoc -I vendor --proto_path=pkg/scalers/externalscaler externalscaler.proto --go_out=pkg/scalers/externalscaler --go-grpc_out=pkg/scalers/externalscaler .PHONY: mockgen-gen mockgen-gen: mockgen pkg/mock/mock_scaling/mock_interface.go pkg/mock/mock_scaler/mock_scaler.go pkg/mock/mock_scale/mock_interfaces.go pkg/mock/mock_client/mock_interfaces.go pkg/scalers/liiklus/mocks/mock_liiklus.go diff --git a/pkg/scalers/liiklus/LiiklusService.pb.go b/pkg/scalers/liiklus/LiiklusService.pb.go index b101d7cc6fc..18a9b752e4a 100644 --- a/pkg/scalers/liiklus/LiiklusService.pb.go +++ b/pkg/scalers/liiklus/LiiklusService.pb.go @@ -7,10 +7,10 @@ package liiklus import ( + empty "github.com/golang/protobuf/ptypes/empty" + timestamp "github.com/golang/protobuf/ptypes/timestamp" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" - emptypb "google.golang.org/protobuf/types/known/emptypb" - timestamppb "google.golang.org/protobuf/types/known/timestamppb" reflect "reflect" sync "sync" ) @@ -326,7 +326,6 @@ type SubscribeReply struct { unknownFields protoimpl.UnknownFields // Types that are assignable to Reply: - // // *SubscribeReply_Assignment Reply isSubscribeReply_Reply `protobuf_oneof:"reply"` } @@ -537,7 +536,6 @@ type ReceiveReply struct { unknownFields protoimpl.UnknownFields // Types that are assignable to Reply: - // // *ReceiveReply_Record_ Reply isReceiveReply_Reply `protobuf_oneof:"reply"` } @@ -807,11 +805,11 @@ type ReceiveReply_Record struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` - Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` - Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` - Timestamp *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Replay bool `protobuf:"varint,5,opt,name=replay,proto3" json:"replay,omitempty"` + Offset uint64 `protobuf:"varint,1,opt,name=offset,proto3" json:"offset,omitempty"` + Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` + Value []byte `protobuf:"bytes,3,opt,name=value,proto3" json:"value,omitempty"` + Timestamp *timestamp.Timestamp `protobuf:"bytes,4,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Replay bool `protobuf:"varint,5,opt,name=replay,proto3" json:"replay,omitempty"` } func (x *ReceiveReply_Record) Reset() { @@ -867,7 +865,7 @@ func (x *ReceiveReply_Record) GetValue() []byte { return nil } -func (x *ReceiveReply_Record) GetTimestamp() *timestamppb.Timestamp { +func (x *ReceiveReply_Record) GetTimestamp() *timestamp.Timestamp { if x != nil { return x.Timestamp } @@ -1073,8 +1071,8 @@ var file_LiiklusService_proto_goTypes = []interface{}{ (*ReceiveReply_Record)(nil), // 13: com.github.bsideup.liiklus.ReceiveReply.Record nil, // 14: com.github.bsideup.liiklus.GetOffsetsReply.OffsetsEntry nil, // 15: com.github.bsideup.liiklus.GetEndOffsetsReply.OffsetsEntry - (*timestamppb.Timestamp)(nil), // 16: google.protobuf.Timestamp - (*emptypb.Empty)(nil), // 17: google.protobuf.Empty + (*timestamp.Timestamp)(nil), // 16: google.protobuf.Timestamp + (*empty.Empty)(nil), // 17: google.protobuf.Empty } var file_LiiklusService_proto_depIdxs = []int32{ 0, // 0: com.github.bsideup.liiklus.SubscribeRequest.autoOffsetReset:type_name -> com.github.bsideup.liiklus.SubscribeRequest.AutoOffsetReset diff --git a/pkg/scalers/liiklus/LiiklusService_grpc.pb.go b/pkg/scalers/liiklus/LiiklusService_grpc.pb.go index d05170338af..6fe70e2b323 100644 --- a/pkg/scalers/liiklus/LiiklusService_grpc.pb.go +++ b/pkg/scalers/liiklus/LiiklusService_grpc.pb.go @@ -8,10 +8,10 @@ package liiklus import ( context "context" + empty "github.com/golang/protobuf/ptypes/empty" grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" - emptypb "google.golang.org/protobuf/types/known/emptypb" ) // This is a compile-time assertion to ensure that this generated file @@ -26,7 +26,7 @@ type LiiklusServiceClient interface { Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishReply, error) Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (LiiklusService_SubscribeClient, error) Receive(ctx context.Context, in *ReceiveRequest, opts ...grpc.CallOption) (LiiklusService_ReceiveClient, error) - Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) + Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*empty.Empty, error) GetOffsets(ctx context.Context, in *GetOffsetsRequest, opts ...grpc.CallOption) (*GetOffsetsReply, error) GetEndOffsets(ctx context.Context, in *GetEndOffsetsRequest, opts ...grpc.CallOption) (*GetEndOffsetsReply, error) } @@ -112,8 +112,8 @@ func (x *liiklusServiceReceiveClient) Recv() (*ReceiveReply, error) { return m, nil } -func (c *liiklusServiceClient) Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) { - out := new(emptypb.Empty) +func (c *liiklusServiceClient) Ack(ctx context.Context, in *AckRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) err := c.cc.Invoke(ctx, "/com.github.bsideup.liiklus.LiiklusService/Ack", in, out, opts...) if err != nil { return nil, err @@ -146,7 +146,7 @@ type LiiklusServiceServer interface { Publish(context.Context, *PublishRequest) (*PublishReply, error) Subscribe(*SubscribeRequest, LiiklusService_SubscribeServer) error Receive(*ReceiveRequest, LiiklusService_ReceiveServer) error - Ack(context.Context, *AckRequest) (*emptypb.Empty, error) + Ack(context.Context, *AckRequest) (*empty.Empty, error) GetOffsets(context.Context, *GetOffsetsRequest) (*GetOffsetsReply, error) GetEndOffsets(context.Context, *GetEndOffsetsRequest) (*GetEndOffsetsReply, error) mustEmbedUnimplementedLiiklusServiceServer() @@ -165,7 +165,7 @@ func (UnimplementedLiiklusServiceServer) Subscribe(*SubscribeRequest, LiiklusSer func (UnimplementedLiiklusServiceServer) Receive(*ReceiveRequest, LiiklusService_ReceiveServer) error { return status.Errorf(codes.Unimplemented, "method Receive not implemented") } -func (UnimplementedLiiklusServiceServer) Ack(context.Context, *AckRequest) (*emptypb.Empty, error) { +func (UnimplementedLiiklusServiceServer) Ack(context.Context, *AckRequest) (*empty.Empty, error) { return nil, status.Errorf(codes.Unimplemented, "method Ack not implemented") } func (UnimplementedLiiklusServiceServer) GetOffsets(context.Context, *GetOffsetsRequest) (*GetOffsetsReply, error) { From 1afef6e00cbe5158f398e32724e87d39d8fe457c Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Tue, 29 Nov 2022 15:11:52 +0530 Subject: [PATCH 2/4] Append EntityPath if not present Signed-off-by: Vighnesh Shenoy --- pkg/scalers/azure_eventhub_scaler.go | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/pkg/scalers/azure_eventhub_scaler.go b/pkg/scalers/azure_eventhub_scaler.go index 281451c75b9..fe21add22e0 100644 --- a/pkg/scalers/azure_eventhub_scaler.go +++ b/pkg/scalers/azure_eventhub_scaler.go @@ -193,15 +193,33 @@ func parseAzureEventHubAuthenticationMetadata(logger logr.Logger, config *Scaler return fmt.Errorf("no storage connection string given") } + connection := "" if config.AuthParams["connection"] != "" { - meta.eventHubInfo.EventHubConnection = config.AuthParams["connection"] + connection = config.AuthParams["connection"] } else if config.TriggerMetadata["connectionFromEnv"] != "" { - meta.eventHubInfo.EventHubConnection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] + connection = config.ResolvedEnv[config.TriggerMetadata["connectionFromEnv"]] } - if len(meta.eventHubInfo.EventHubConnection) == 0 { + if len(connection) == 0 { return fmt.Errorf("no event hub connection string given") } + + if !strings.Contains(connection, "EntityPath") { + eventHubName := "" + if config.TriggerMetadata["eventHubName"] != "" { + eventHubName = config.TriggerMetadata["eventHubName"] + } else if config.TriggerMetadata["eventHubNameFromEnv"] != "" { + eventHubName = config.ResolvedEnv[config.TriggerMetadata["eventHubNameFromEnv"]] + } + + if eventHubName == "" { + return fmt.Errorf("connection string does not contain event hub name, and parameter eventHubName not provided") + } + + connection = fmt.Sprintf("%s;EntityPath=%s", connection, eventHubName) + } + + meta.eventHubInfo.EventHubConnection = connection case v1alpha1.PodIdentityProviderAzure, v1alpha1.PodIdentityProviderAzureWorkload: meta.eventHubInfo.StorageAccountName = "" if val, ok := config.TriggerMetadata["storageAccountName"]; ok { From f438b5bd2be0edfc2948e1ce5fa749fc27d9d215 Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Tue, 29 Nov 2022 16:28:41 +0530 Subject: [PATCH 3/4] Update CHANGELOG. Signed-off-by: Vighnesh Shenoy --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index bc07fd498d9..a54cb2cd38a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -61,6 +61,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio - **Apache Kafka Scaler:** SASL/OAuthbearer Implementation ([#3681](https://github.com/kedacore/keda/issues/3681)) - **Azure AD Pod Identity Authentication:** Improve error messages to emphasize problems around the integration with aad-pod-identity itself ([#3610](https://github.com/kedacore/keda/issues/3610)) - **Azure Event Hub Scaler:** Support Azure Active Direcotry Pod & Workload Identity for Storage Blobs ([#3569](https://github.com/kedacore/keda/issues/3569)) +- **Azure Event Hub Scaler:** Support using connection strings for Event Hub namespace instead of the Event Hub itself. ([#3922](https://github.com/kedacore/keda/issues/3922)) - **Azure Pipelines Scaler:** Improved speed of profiling large set of Job Requests from Azure Pipelines ([#3702](https://github.com/kedacore/keda/issues/3702)) - **GCP Storage Scaler:** Add prefix and delimiter support ([#3756](https://github.com/kedacore/keda/issues/3756)) - **Metrics API Scaler:** Add unsafeSsl paramater to skip certificate validation when connecting over HTTPS ([#3728](https://github.com/kedacore/keda/discussions/3728)) From fae49631c88b3c7a99cdf54bbd219bece81e7ae8 Mon Sep 17 00:00:00 2001 From: Vighnesh Shenoy Date: Tue, 29 Nov 2022 16:54:29 +0530 Subject: [PATCH 4/4] Update unit tests Signed-off-by: Vighnesh Shenoy --- pkg/scalers/azure_eventhub_scaler_test.go | 169 +++++++++++++++++----- 1 file changed, 134 insertions(+), 35 deletions(-) diff --git a/pkg/scalers/azure_eventhub_scaler_test.go b/pkg/scalers/azure_eventhub_scaler_test.go index 38ba4bdb38b..d00e1edf6d3 100644 --- a/pkg/scalers/azure_eventhub_scaler_test.go +++ b/pkg/scalers/azure_eventhub_scaler_test.go @@ -20,6 +20,7 @@ const ( eventHubConsumerGroup = "testEventHubConsumerGroup" eventHubConnectionSetting = "testEventHubConnectionSetting" storageConnectionSetting = "testStorageConnectionSetting" + eventHubsConnection = "Endpoint=sb://testEventHubNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=testKey;EntityPath=testEventHub" serviceBusEndpointSuffix = "serviceBusEndpointSuffix" storageEndpointSuffix = "storageEndpointSuffix" activeDirectoryEndpoint = "activeDirectoryEndpoint" @@ -31,8 +32,9 @@ const ( ) type parseEventHubMetadataTestData struct { - metadata map[string]string - isError bool + metadata map[string]string + resolvedEnv map[string]string + isError bool } type eventHubMetricIdentifier struct { @@ -41,66 +43,163 @@ type eventHubMetricIdentifier struct { name string } -var sampleEventHubResolvedEnv = map[string]string{eventHubConnectionSetting: "none", storageConnectionSetting: "none"} +var sampleEventHubResolvedEnv = map[string]string{eventHubConnectionSetting: eventHubsConnection, storageConnectionSetting: "none"} var parseEventHubMetadataDataset = []parseEventHubMetadataTestData{ - {map[string]string{}, true}, + { + metadata: map[string]string{}, + isError: true, + }, // properly formed event hub metadata - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // missing event hub connection setting - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15"}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // missing storage connection setting - {map[string]string{"consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true}, + { + metadata: map[string]string{"consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // missing event hub consumer group - should replace with default - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // missing unprocessed event threshold - should replace with default - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // invalid activation unprocessed event threshold - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "activationUnprocessedEventThreshold": "AA"}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "activationUnprocessedEventThreshold": "AA"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // added blob container details - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "azureFunction"}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "blobContainer": testContainerName, "checkpointStrategy": "azureFunction"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, + // connection string without EntityPath, no event hub name provided + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, + resolvedEnv: map[string]string{eventHubConnectionSetting: "Endpoint=sb://testEventHubNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=testKey;", storageConnectionSetting: "none"}, + isError: true, + }, + // connection string without EntityPath, event hub name provided + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName}, + resolvedEnv: map[string]string{eventHubConnectionSetting: "Endpoint=sb://testEventHubNamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=testKey;", storageConnectionSetting: "none"}, + isError: false, + }, } var parseEventHubMetadataDatasetWithPodIdentity = []parseEventHubMetadataTestData{ - {map[string]string{}, true}, + { + metadata: map[string]string{}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // Even though connection string is provided, this should fail because the eventhub Namespace is not provided explicitly when using Pod Identity - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "connectionFromEnv": eventHubConnectionSetting, "unprocessedEventThreshold": "15"}, + isError: true}, // properly formed event hub metadata with Pod Identity - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // missing eventHubname - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubNamespace": testEventHubNamespace}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // missing eventHubNamespace - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true}, // metadata with cloud specified - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "azurePublicCloud"}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "azurePublicCloud"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // metadata with private cloud missing service bus endpoint suffix and active directory endpoint and eventHubResourceURL - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "private"}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private"}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // metadata with private cloud missing active directory endpoint and resourceURL - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true}, // metadata with private cloud missing service bus endpoint suffix and resource URL - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "private", "activeDirectoryEndpoint": activeDirectoryEndpoint}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "activeDirectoryEndpoint": activeDirectoryEndpoint}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true}, // metadata with private cloud missing service bus endpoint suffix and active directory endpoint - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "private", "eventHubResourceURL": eventHubResourceURL}, true}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "eventHubResourceURL": eventHubResourceURL}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // properly formed metadata with private cloud - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, - "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace, "cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // properly formed event hub metadata with Pod Identity and no storage connection string - {map[string]string{"storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + { + metadata: map[string]string{"storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // event hub metadata with Pod Identity, no storage connection string, no storageAccountName - should fail - {map[string]string{"consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true}, + { + metadata: map[string]string{"consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true, + }, // event hub metadata with Pod Identity, no storage connection string, empty storageAccountName - should fail - {map[string]string{"storageAccount": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true}, + { + metadata: map[string]string{"storageAccount": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true}, // event hub metadata with Pod Identity, storage connection string, empty storageAccountName - should ignore pod identity for blob storage and succeed - {map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "storageAccountName": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + { + metadata: map[string]string{"storageConnectionFromEnv": storageConnectionSetting, "storageAccountName": "", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, // event hub metadata with Pod Identity and no storage connection string, private cloud and no storageEndpointSuffix - should fail - {map[string]string{"cloud": "private", "storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, true}, + { + metadata: map[string]string{"cloud": "private", "storageAccountName": "blobstorage", "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: true}, // properly formed event hub metadata with Pod Identity and no storage connection string, private cloud and storageEndpointSuffix - {map[string]string{"cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL, "storageAccountName": "aStorageAccount", "storageEndpointSuffix": storageEndpointSuffix, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, false}, + { + metadata: map[string]string{"cloud": "private", "endpointSuffix": serviceBusEndpointSuffix, "activeDirectoryEndpoint": activeDirectoryEndpoint, "eventHubResourceURL": eventHubResourceURL, "storageAccountName": "aStorageAccount", "storageEndpointSuffix": storageEndpointSuffix, "consumerGroup": eventHubConsumerGroup, "unprocessedEventThreshold": "15", "eventHubName": testEventHubName, "eventHubNamespace": testEventHubNamespace}, + resolvedEnv: sampleEventHubResolvedEnv, + isError: false, + }, } var eventHubMetricIdentifiers = []eventHubMetricIdentifier{ @@ -120,7 +219,7 @@ var testEventHubScaler = azureEventHubScaler{ func TestParseEventHubMetadata(t *testing.T) { // Test first with valid resolved environment for _, testData := range parseEventHubMetadataDataset { - _, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: sampleEventHubResolvedEnv, AuthParams: map[string]string{}}) + _, err := parseAzureEventHubMetadata(logr.Discard(), &ScalerConfig{TriggerMetadata: testData.metadata, ResolvedEnv: testData.resolvedEnv, AuthParams: map[string]string{}}) if err != nil && !testData.isError { t.Errorf("Expected success but got error: %s", err)