Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OauthBearer validation filter #1195

Merged
merged 6 commits into from
Jun 5, 2024

Conversation

callaertanthony
Copy link
Contributor

@callaertanthony callaertanthony commented May 9, 2024

Type of change

  • Enhancement / new feature

Description

This pull request has the intention to introduce OauthBearer management into Kroxylicious.
Two ideas for this :

  • Add a new OauthBearerFilter to check if a token is valid before sending it to upstream.
  • Re-enable AuthnHandler and add OAUTHBEARER.

Additional Context

  • OauthBearerFilter is simple and really useful for a Kafka proxy, it helps relieve the upstream kafka when downstream tried too many invalid authentications.If token is valid, it just forward the SASL request to upstream, letting upstream kafka do it again and self manage RBAC.
  • The second method is to fully delegate the SASL connection to Kroxylicious. Kroxylicious will catch all SASL requests and handle the connection. Then, it uses another self connection to upstream, using this new upstream role.
    It requires to have RBAC management I think, otherwise, the use case is just to proxy OAUTHBEARER to ANY connection, loosing the OAUTHBEARER principal user and using another one from the upstream connection (or no one if no authentication configured.
    AuthnHandler is more tricky, at this time, it doesn't give the possibility to handle different OAUTHBEARER configs by VirtualCluster, so I think a refactor is required on this part.
    Edit : I moved the SASL connection part to another PR : handle OAUTHBEARER SASL connection #1206

Checklist

Please go through this checklist and make sure all applicable tasks have been done

  • Write tests
  • Make sure all tests pass
  • Review performance test results. Ensure that any degradations to performance numbers are understood and justified.
  • Make sure all Sonarcloud warnings are addressed or are justifiably ignored.
  • Update documentation
  • Reference relevant issue(s) and close them after merging
  • For user facing changes, update CHANGELOG.md (remember to include changes affecting the API of the test artefacts too).

@callaertanthony callaertanthony force-pushed the feat/oauth-proxy branch 8 times, most recently from 3c099bf to ae95b4a Compare May 12, 2024 21:50
Copy link
Contributor

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like great progress, thank you!

docs/available-filters/oauthbearer/oauthbearer.adoc Outdated Show resolved Hide resolved
@@ -742,7 +747,7 @@ else if (offsetFetchRequestData.topics() != null) {
.build(apiVersion);
break;
case UPDATE_METADATA:
req = new UpdateFeaturesRequest((UpdateFeaturesRequestData) reqBody, apiVersion);
req = new UpdateFeaturesRequest((UpdateFeaturesRequestData) reqBody, apiVersion); // TODO: check why
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you mean, "why it's not using the builder", I'm not sure there is a reason. Happy to switch to using the builder if it doesn't fail the tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I first read it, I didn't understand why the UPDATE_METADATA returns an UpdateFeaturesRequest rather than an UpdateMetadataRequest ? Is it voluntary ?

@callaertanthony callaertanthony force-pushed the feat/oauth-proxy branch 7 times, most recently from 5b2c6fe to 24f7a35 Compare May 14, 2024 06:39
@k-wall
Copy link
Contributor

k-wall commented May 14, 2024

This looks really useful to me. I know in a past life, I'd have loved to been able to protect my Kafka Cluster from clients hammering it with duff tokens.

Copy link
Contributor

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks again @callaertanthony, I took a closer look and left a few comments. I don't think this is very far from being mergeable.

docs/available-filters/oauthbearer/oauthbearer.adoc Outdated Show resolved Hide resolved
docs/available-filters/oauthbearer/oauthbearer.adoc Outdated Show resolved Hide resolved
kroxylicious-filters/kroxylicious-oauthbearer/pom.xml Outdated Show resolved Hide resolved
<id>analyze</id>
<configuration>
<ignoredUnusedDeclaredDependencies>
<ignoredUnusedDeclaredDependency>org.bitbucket.b_c:jose4j</ignoredUnusedDeclaredDependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain how this is actually being used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency is used by kafka on the token validation, but it is not found by kroxylicious on classpath. We must add it as dependency in the project but we don't directly use it. It's used by the kafka OAuthBearerValidatorCallbackHandler

Copy link
Contributor

@k-wall k-wall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One comment left. There's also one Sonar issue. But otherwise code lgtm.
Can you add an entry into the CHANGLOG?

@callaertanthony callaertanthony force-pushed the feat/oauth-proxy branch 3 times, most recently from ab80548 to 651f569 Compare May 27, 2024 06:58
@k-wall
Copy link
Contributor

k-wall commented May 27, 2024

I ran up a test case using auth0.com as the oauth server. I notice a problem, the token validation is failing (it is an audience issue but the precise detail of the error is unimportant) but the filter is still relaying the SaslAuthenticationRequest to the broker.

I'm seeing this is the logs:

2024-05-27 15:28:08 <KQueueEventLoopGroup-5-1> WARN  org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler:187 - Could not validate the access token: JWT (claims->{"iss":"https://dev-j2kfifzhr0wa7o3h.us.auth0.com/","sub":"8adOD5hy3CBTAYUel6RO54b6M1mvP1W8@clients","aud":"https://dev-j2kfifzhr0wa7o3h.us.auth0.com/api/v2/","iat":1716820056,"exp":1716906456,"gty":"client-credentials","azp":"8adOD5hy3CBTAYUel6RO54b6M1mvP1W8"}) rejected due to invalid claims or other invalid content. Additional details: [[8] Audience (aud) claim [https://dev-j2kfifzhr0wa7o3h.us.auth0.com/api/v2/] present in the JWT but no expected audience value(s) were provided to the JWT Consumer. Expected one of [] as an aud value.]
org.apache.kafka.common.security.oauthbearer.internals.secured.ValidateException: Could not validate the access token: JWT (claims->{"iss":"https://dev-j2kfifzhr0wa7o3h.us.auth0.com/","sub":"8adOD5hy3CBTAYUel6RO54b6M1mvP1W8@clients","aud":"https://dev-j2kfifzhr0wa7o3h.us.auth0.com/api/v2/","iat":1716820056,"exp":1716906456,"gty":"client-credentials","azp":"8adOD5hy3CBTAYUel6RO54b6M1mvP1W8"}) rejected due to invalid claims or other invalid content. Additional details: [[8] Audience (aud) claim [https://dev-j2kfifzhr0wa7o3h.us.auth0.com/api/v2/] present in the JWT but no expected audience value(s) were provided to the JWT Consumer. Expected one of [] as an aud value.]
	at org.apache.kafka.common.security.oauthbearer.internals.secured.ValidatorAccessTokenValidator.validate(ValidatorAccessTokenValidator.java:158) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.handleValidatorCallback(OAuthBearerValidatorCallbackHandler.java:184) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallbackHandler.handle(OAuthBearerValidatorCallbackHandler.java:169) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer.process(OAuthBearerSaslServer.java:156) ~[kafka-clients-3.7.0.jar:?]
	at org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer.evaluateResponse(OAuthBearerSaslServer.java:101) ~[kafka-clients-3.7.0.jar:?]
	at io.kroxylicious.proxy.filter.oauthbearer.OauthBearerValidationFilter.doAuthenticate(OauthBearerValidationFilter.java:172) ~[classes/:?]
	at io.kroxylicious.proxy.filter.oauthbearer.OauthBearerValidationFilter.lambda$authenticate$2(OauthBearerValidationFilter.java:153) ~[classes/:?]
	at io.kroxylicious.proxy.filter.oauthbearer.OauthBearerValidationFilter.schedule(OauthBearerValidationFilter.java:182) ~[classes/:?]
	at io.kroxylicious.proxy.filter.oauthbearer.OauthBearerValidationFilter.authenticate(OauthBearerValidationFilter.java:151) ~[classes/:?]
	at io.kroxylicious.proxy.filter.oauthbearer.OauthBearerValidationFilter.onSaslAuthenticateRequest(OauthBearerValidationFilter.java:108) ~[classes/:?]
	at io.kroxylicious.proxy.filter.SaslAuthenticateRequestFilterInvoker.onRequest(SaslAuthenticateRequestFilterInvoker.java:47) ~[classes/:?]
	at io.kroxylicious.proxy.filter.SpecificFilterArrayInvoker.onRequest(SpecificFilterArrayInvoker.java:649) ~[classes/:?]
	at io.kroxylicious.proxy.filter.SafeInvoker.onRequest(SafeInvoker.java:34) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.dispatchDecodedRequest(FilterHandler.java:224) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.writeDecodedRequest(FilterHandler.java:205) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.write(FilterHandler.java:143) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:891) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.kroxylicious.proxy.internal.FilterHandler.forwardRequest(FilterHandler.java:341) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.handleRequestFilterResult(FilterHandler.java:271) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.lambda$configureRequestFilterChain$12(FilterHandler.java:233) ~[classes/:?]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:684) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[?:?]
	at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2200) ~[?:?]
	at io.kroxylicious.proxy.internal.FilterHandler.configureRequestFilterChain(FilterHandler.java:233) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.writeDecodedRequest(FilterHandler.java:213) ~[classes/:?]
	at io.kroxylicious.proxy.internal.FilterHandler.write(FilterHandler.java:143) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:891) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1015) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannel.write(AbstractChannel.java:301) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.kroxylicious.proxy.internal.KafkaProxyFrontendHandler.forwardOutbound(KafkaProxyFrontendHandler.java:367) ~[classes/:?]
	at io.kroxylicious.proxy.internal.KafkaProxyFrontendHandler.channelRead(KafkaProxyFrontendHandler.java:194) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.kroxylicious.proxy.internal.ResponseOrderer.channelRead(ResponseOrderer.java:56) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544) ~[netty-transport-classes-kqueue-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:387) ~[netty-transport-classes-kqueue-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:218) ~[netty-transport-classes-kqueue-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:296) ~[netty-transport-classes-kqueue-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.109.Final.jar:4.1.109.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.109.Final.jar:4.1.109.Final]
	at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: org.jose4j.jwt.consumer.InvalidJwtException: JWT (claims->{"iss":"https://dev-j2kfifzhr0wa7o3h.us.auth0.com/","sub":"8adOD5hy3CBTAYUel6RO54b6M1mvP1W8@clients","aud":"https://dev-j2kfifzhr0wa7o3h.us.auth0.com/api/v2/","iat":1716820056,"exp":1716906456,"gty":"client-credentials","azp":"8adOD5hy3CBTAYUel6RO54b6M1mvP1W8"}) rejected due to invalid claims or other invalid content. Additional details: [[8] Audience (aud) claim [https://dev-j2kfifzhr0wa7o3h.us.auth0.com/api/v2/] present in the JWT but no expected audience value(s) were provided to the JWT Consumer. Expected one of [] as an aud value.]
	at org.jose4j.jwt.consumer.JwtConsumer.validate(JwtConsumer.java:459) ~[jose4j-0.9.6.jar:?]
	at org.jose4j.jwt.consumer.JwtConsumer.processContext(JwtConsumer.java:304) ~[jose4j-0.9.6.jar:?]
	at org.jose4j.jwt.consumer.JwtConsumer.process(JwtConsumer.java:410) ~[jose4j-0.9.6.jar:?]
	at org.apache.kafka.common.security.oauthbearer.internals.secured.ValidatorAccessTokenValidator.validate(ValidatorAccessTokenValidator.java:156) ~[kafka-clients-3.7.0.jar:?]
	... 61 more

examing the stack trace I see that OAuthBearerSaslServer.process is going doing the token == null path, which causes a error response to be returned to the caller. The filter isn't treating this an an exception, so the Sasl Auth Request is being fowarded. You see a warn written to Kroxylicious's logs, but the connection between client/broker is still setup.

@k-wall
Copy link
Contributor

k-wall commented May 27, 2024

To work with auth0, I found I need to pass through the SASL_OAUTHBEARER_EXPECTED_AUDIENCE as a config parameter. With that in place, the happy path was good for my test case.

I then tried deliberately poisoning the token presented by the kafka client sasl.oauthbearer.token.endpoint.url=file:////tmp/poisonedtoken

I hit the same problem as above, the Sasl Auth Request is being fowarded despite the duff token.

I have a rough fix which I share with you as a PR (no tests, no docs for the new property).

https://github.com/callaertanthony/kroxylicious/pull/1/files

@callaertanthony
Copy link
Contributor Author

To work with auth0, I found I need to pass through the SASL_OAUTHBEARER_EXPECTED_AUDIENCE as a config parameter. With that in place, the happy path was good for my test case.

I then tried deliberately poisoning the token presented by the kafka client sasl.oauthbearer.token.endpoint.url=file:////tmp/poisonedtoken

I hit the same problem as above, the Sasl Auth Request is being fowarded despite the duff token.

I have a rough fix which I share with you as a PR (no tests, no docs for the new property).

callaertanthony/kroxylicious#1 (files)

thanks @k-wall , I added tests, doc and the issuer property

Copy link
Contributor

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @callaertanthony, I found a few more minor things worth discussing.

return operation.get();
}
CompletableFuture<A> future = new CompletableFuture<>();
executorService.schedule(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By discarding the returned ScheduledFuture you're basically allowing the client to send very many SaslAuthenticateRequests, each with its own scheduled completion. Other clients are sharing the same event loop (which is what the executorService here actually is), so with enough scheduled events you might get an obserable noisy neighbour effect.

We should probably hold the last scheduled operation's ScheduledFuture in a field. That would allow us to implement a one-scheduler-slot-per-client policy by cancelling the outstanding operation. We'd need to decide what the right thing to do with the two in-flight requests is. I guess respond to the first with some error code, and allow the second to be scheduled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cancel the first request, a response will be sent to the client no ? How this behavior will prevent the client to start new requests ? We can effectively limit the number of scheduled operations to not , but I don't see how this helps to prevent too many requests ?
Maybe we can refuse new requests from client when another one is already scheduled to fast short circuit ?

@k-wall
Copy link
Contributor

k-wall commented May 29, 2024

@callaertanthony I made a couple of suggestions in callaertanthony#2

Copy link
Contributor

@k-wall k-wall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm. @tombentley would you be able to re-review? I think this is good to merge.

@callaertanthony
Copy link
Contributor Author

thank you for your help

Copy link
Contributor

@tombentley tombentley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

sonarqubecloud bot commented Jun 5, 2024

@tombentley tombentley merged commit 95fe331 into kroxylicious:main Jun 5, 2024
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

3 participants