Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to create Scaled Object with Oauthbearer SASL type with Confluent Cloud cluster #5757

Closed
acartag7 opened this issue Apr 30, 2024 · 12 comments
Assignees
Labels
bug Something isn't working stale All issues that are marked as stale due to inactivity

Comments

@acartag7
Copy link

Report

We are trying to setup Kafka Scaled object with our confluent cloud dedicated cluster using SASL/OAUTHBEARER authentication and we are getting authentication failures.

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}

I've tried following the documentation here, but seems like there is a problem with the extensions part as during my tests, I saw the following:

If I set the oauthExtensions: invalid=nothing I get the usual logicalCluster is missing a cluster_id

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY"}

Now if I set up oauthExtensions: extension_identityPoolId=pool-ebYj without the cluster id I get the authentication failed message (the same if I put the the pool id and cluster id as in the manifests below):

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}

I know its not the credentials as when I input incorrect credentials I get the following message in the operator directly from microsoft Entra ID:

"error": "error creating kafka client: kafka: client has run out of available brokers to talk to: oauth2: \"unauthorized_client\" \"XXXXXXXXX: Application with identifier 'YYYYY-YYYY-YYYY-YYYY-YYYYYYYYYxxx' was not found in the directory 'XXX AG'. This can happen if the application has not been installed by the administrator of the tenant or consented to by any user in the tenant. You may have sent your authentication request to the wrong tenant. Trace ID: xxxx-xxxx-xxx-xxxx Correlation ID: xxxx-xxxx-xxx-xxxx Timestamp: 2024-04-30 16:00:34Z\" \"https://login.microsoftonline.com/error?code=700016\""}

The Scaledobject using sasl plaintext and the api keys is working without issues but we can't use this auth method in our setup.

I think this issue hasn't been reported before, any ideas on what I could try?

Expected Behavior

The Kafka scaler is active, in ready status and the deployments scale properly.

Actual Behavior

Authentication Fails with sasl/oauthbearer.

Steps to Reproduce the Problem

To test this you will need a kafka cluster and you should authenticate with SASL/OAUTHBEARER
Deploy the manifests below:

kind: Secret
apiVersion: v1
metadata:
  name: keda-secrets-0004
stringData:
  username: superusername
  password: superpassword
  oauthTokenEndpointUri: https://login.microsoftonline.com/tenant_id/oauth2/token
  scopes: "superusername/.default"
  oauthExtensions: extension_logicalCluster=lkc-x8ff65,extension_identityPoolId=pool-xyzs
type: Opaque
---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: trigger-authentication-0004
spec:
  secretTargetRef:
    - parameter: username
      name: keda-secrets-0004
      key: username
    - parameter: password
      name: keda-secrets-0004
      key: password
    - parameter: oauthTokenEndpointUri
      name: keda-secrets-0004
      key: oauthTokenEndpointUri
    - parameter: scopes
      name: keda-secrets-0004
      key: scopes
    - parameter: oauthExtensions
      name: keda-secrets-0004
      key: oauthExtensions
---
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
  name: scaler-0004
spec:
  minReplicaCount: 0
  maxReplicaCount: 3
  scaleTargetRef:
    name: kafka-inspector-keda-test
    apiVersion: apps/v1
    kind: Deployment
  pollingInterval:  60
  triggers:
    - type: kafka
      authenticationRef:
        name: trigger-authentication-0004
      metadata:
        bootstrapServers: lkc-x8ff65.privdomxxxxx.eu-central-1.aws.confluent.cloud:9092
        topic: test-topic
        consumerGroup: app.myconsumer-dev
        lagThreshold: '50'
        activationLagThreshold: '0'
        offsetResetPolicy: latest
        sasl: "oauthbearer"
        tls: "enable"
---
# create simple nginx deployment
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kakfa-inspector-keda-test
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kakfa-inspector-keda-test
  template:
    metadata:
      labels:
        app: kakfa-inspector-keda-test
    spec:
      containers:
      - name: nginx
        image: nginx:latest
        ports:
        - containerPort: 80

Logs from KEDA operator

2024-04-30T15:13:57Z    INFO    Reconciling ScaledObject        {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04"}
2024-04-30T15:13:57Z    INFO    Creating a new HPA      {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "HPA.Namespace": "default", "HPA.Name": "keda-hpa-scaler-0004"}
2024-04-30T15:14:19Z    ERROR   scale_handler   error resolving auth params     {"type": "ScaledObject", "namespace": "default", "name": "scaler-0004", "scalerIndex": 0, "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   Error getting scalers   {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   Failed to create new HPA resource       {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "HPA.Namespace": "default", "HPA.Name": "keda-hpa-scaler-0004", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   failed to ensure HPA is correctly created for ScaledObject      {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}
2024-04-30T15:14:19Z    ERROR   Reconciler error        {"controller": "scaledobject", "controllerGroup": "keda.sh", "controllerKind": "ScaledObject", "ScaledObject": {"name":"scaler-0004","namespace":"default"}, "namespace": "default", "name": "scaler-0004", "reconcileID": "16bc803b-9af0-4f8a-8685-268e32428f04", "error": "error creating kafka client: kafka: client has run out of available brokers to talk to: kafka server: SASL Authentication failed: Authentication failed during authentication due to invalid credentials with SASL mechanism OAUTHBEARER"}

KEDA Version

2.14.0

Kubernetes Version

1.27

Platform

Any

Scaler Details

Kafka

Anything else?

No response

@acartag7 acartag7 added the bug Something isn't working label Apr 30, 2024
@JorTurFer
Copy link
Member

@dttung2905 @zroubalik , you are the kafka experts 😄

@fabiodellanna
Copy link

Hi all, I experienced the same issue reported by @acartag7
I'm not able to autoscale a cluster with oauthbearer authentication. No news so far?

@dttung2905
Copy link
Contributor

sorry I missed this. Let me take a look into this in the next few days.Probably need to create a local strimzi kafka cluster with SASL/OAUTHBEARER authentication first 😆

@dttung2905
Copy link
Contributor

I just spend some times looking through the code but can't find anything yet. Partly I think its because I had not been able to set up a local strimzi cluster with SASL/OAUTHBEARER authentication. 🤦

Hi @adrien-f , I see you have recently contributed with this PR #5692. I think you might have an already kafka cluster setup with SASL/OAUTHBEARER and has much better understanding of this part of the code than me 😄 Do you mind help us take a look at this too ?

@adrien-f
Copy link
Contributor

adrien-f commented Jun 3, 2024

Greetings ! I'd gladly help as much as possible!

So as a reference, if not configured for IAM MSK, it will use the Kafka OAuthBearerTokenProvider:

config.Net.SASL.TokenProvider = kafka.OAuthBearerTokenProvider(metadata.username, metadata.password, metadata.oauthTokenEndpointURI, metadata.scopes, metadata.oauthExtensions)

The errors mentioning the use of OAUth, we can assume that the configuration was parsed properly. We can also add a unit test to verify that later. What's left is to try out the Token Provider, you could try this manually (code written by hand):

package main

import (
	"context"
	"sync"
	"time"

	"github.com/IBM/sarama"
	"github.com/aws/aws-msk-iam-sasl-signer-go/signer"
	"github.com/aws/aws-sdk-go-v2/aws"
	"golang.org/x/oauth2"
	"golang.org/x/oauth2/clientcredentials"
)

type TokenProvider interface {
	sarama.AccessTokenProvider
	String() string
}

type oauthBearerTokenProvider struct {
	tokenSource oauth2.TokenSource
	extensions  map[string]string
}

func OAuthBearerTokenProvider(clientID, clientSecret, tokenURL string, scopes []string, extensions map[string]string) TokenProvider {
	cfg := clientcredentials.Config{
		ClientID:     clientID,
		ClientSecret: clientSecret,
		TokenURL:     tokenURL,
		Scopes:       scopes,
	}

	return &oauthBearerTokenProvider{
		tokenSource: cfg.TokenSource(context.Background()),
		extensions:  extensions,
	}
}

func (o *oauthBearerTokenProvider) Token() (*sarama.AccessToken, error) {
	token, err := o.tokenSource.Token()
	if err != nil {
		return nil, err
	}

	return &sarama.AccessToken{Token: token.AccessToken, Extensions: o.extensions}, nil
}

func (o *oauthBearerTokenProvider) String() string {
	return "OAuthBearer"
}

func main() {
	tp := OAuthBearerTokenProvider("xxx", "xxx", ....)
	token, err := tp.Token()
	if err != nil {
		fmt.Println("could not get token", err)
	} else {
		fmt.Println("got token", token)
	}
}

If we do indeed have a Token, we'll have to dig deeper 😅

@djmacken557
Copy link

Hello, we are encountering the same scenario trying to set up KEDA scaler using oauthbearer to Confluent Cloud. saslplaintext also works as the original poster described. Are there any updates on this issue?

@adrien-f
Copy link
Contributor

Hello @djmacken557 !

Would that be possible to run the debugging script like I mentioned to ensure the token is indeed fetched? The idea is to validate that it's possible to fetch the token before going further into the library.

@rasifmahmud
Copy link

rasifmahmud commented Jun 24, 2024

@adrien-f @dttung2905 getting rid of the extension_ prefix solved the issue for us. For instance try with this:
oauthExtensions: logicalCluster=lkc-x8ff65,identityPoolId=pool-xyzs
Let's get the documentation updated. I am assuming it was tested before with that prefix? If that used to work earlier but not anymore, then I would believe confluent has changed this server-side recently.

@zroubalik
Copy link
Member

@rasifmahmud thanks for the confirmation :) Are you willing to update docs?

@adrien-f do you think it makes sense to add an unit test for this?

@rasifmahmud
Copy link

rasifmahmud commented Jun 26, 2024

@zroubalik sure, I will raise a PR

Copy link

stale bot commented Aug 27, 2024

This issue has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. Thank you for your contributions.

@stale stale bot added the stale All issues that are marked as stale due to inactivity label Aug 27, 2024
Copy link

stale bot commented Sep 5, 2024

This issue has been automatically closed due to inactivity.

@stale stale bot closed this as completed Sep 5, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working stale All issues that are marked as stale due to inactivity
Projects
None yet
Development

No branches or pull requests

8 participants