Skip to content

Commit

Permalink
feat: implement kafka security policy
Browse files Browse the repository at this point in the history
  • Loading branch information
phiz71 committed Nov 12, 2024
1 parent 8f6270f commit f1db2f1
Show file tree
Hide file tree
Showing 5 changed files with 340 additions and 153 deletions.
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@

<!--
nimbus-jose-jwt is kept at version 9.15.2 on purpose because 9.25.4 introduced a breaking change in the way empty signatures are handled
https://bitbucket.org/connect2id/nimbus-jose-jwt/src/94a2e4d11ce322966d30a551a2d482936b588c87/CHANGELOG.txt#lines-1477:1481
https://bitbucket.org/connect2id/nimbus-jose-jwt/src/94a2e4d11ce322966d30a551a2d482936b588c87/CHANGELOG.txt#lines-1477:1481
-->
<dependency>
<groupId>com.nimbusds</groupId>
Expand Down Expand Up @@ -143,6 +143,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test scope -->
<dependency>
<groupId>io.gravitee.apim.gateway</groupId>
Expand Down
245 changes: 147 additions & 98 deletions src/main/java/io/gravitee/policy/jwt/JWTPolicy.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import com.nimbusds.jwt.JWTClaimsSet;
import io.gravitee.common.security.jwt.LazyJWT;
import io.gravitee.gateway.reactive.api.ExecutionFailure;
import io.gravitee.gateway.reactive.api.context.base.BaseExecutionContext;
import io.gravitee.gateway.reactive.api.context.http.HttpPlainExecutionContext;
import io.gravitee.gateway.reactive.api.context.http.HttpPlainRequest;
import io.gravitee.gateway.reactive.api.context.kafka.KafkaConnectionContext;
import io.gravitee.gateway.reactive.api.policy.SecurityToken;
import io.gravitee.gateway.reactive.api.policy.http.HttpSecurityPolicy;
import io.gravitee.gateway.reactive.api.policy.kafka.KafkaSecurityPolicy;
import io.gravitee.policy.jwt.configuration.JWTPolicyConfiguration;
import io.gravitee.policy.jwt.jwk.provider.DefaultJWTProcessorProvider;
import io.gravitee.policy.jwt.jwk.provider.JWTProcessorProvider;
Expand All @@ -39,7 +42,13 @@
import io.reactivex.rxjava3.core.Single;
import io.vertx.rxjava3.core.http.HttpHeaders;
import java.text.ParseException;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import javax.security.auth.callback.Callback;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback;
import org.apache.kafka.common.security.oauthbearer.internals.secured.BasicOAuthBearerToken;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -48,7 +57,7 @@
* @author Jeoffrey HAEYAERT (jeoffrey.haeyaert at graviteesource.com)
* @author GraviteeSource Team
*/
public class JWTPolicy extends JWTPolicyV3 implements HttpSecurityPolicy {
public class JWTPolicy extends JWTPolicyV3 implements HttpSecurityPolicy, KafkaSecurityPolicy {

public static final String CONTEXT_ATTRIBUTE_JWT = "jwt";
private static final Logger log = LoggerFactory.getLogger(JWTPolicy.class);
Expand Down Expand Up @@ -77,22 +86,12 @@ public int order() {

@Override
public Maybe<SecurityToken> extractSecurityToken(HttpPlainExecutionContext ctx) {
LazyJWT jwtToken = ctx.getAttribute(CONTEXT_ATTRIBUTE_JWT);

if (jwtToken == null) {
jwtToken = TokenExtractor.extract(ctx.request()).map(LazyJWT::new).orElse(null);
}

if (jwtToken != null) {
ctx.setAttribute(CONTEXT_ATTRIBUTE_JWT, jwtToken);
String clientId = getClientId(jwtToken);
if (clientId != null) {
return Maybe.just(SecurityToken.forClientId(clientId));
}
return Maybe.just(SecurityToken.invalid(SecurityToken.TokenType.CLIENT_ID));
}
return getSecurityTokenFromContext(ctx);
}

return Maybe.empty();
@Override
public Maybe<SecurityToken> extractSecurityToken(KafkaConnectionContext ctx) {
return getSecurityTokenFromContext(ctx);
}

/**
Expand All @@ -108,58 +107,92 @@ public boolean requireSubscription() {

@Override
public Completable onRequest(HttpPlainExecutionContext ctx) {
return handleSecurity(ctx);
return handleSecurity(ctx)
.flatMapCompletable(jwtClaimsSet ->
Completable.fromRunnable(() -> {
if (!configuration.isPropagateAuthHeader()) {
ctx.request().headers().remove(HttpHeaders.AUTHORIZATION);
}
})
);
}

private Completable handleSecurity(final HttpPlainExecutionContext ctx) {
return extractToken(ctx)
.flatMapSingle(jwt -> validateToken(ctx, jwt).doOnSuccess(claims -> setAuthContextInfos(ctx, jwt, claims)))
.ignoreElement();
}
@Override
public Completable authenticate(KafkaConnectionContext ctx) {
return handleSecurity(ctx)
.flatMapCompletable(jwtClaimsSet ->
Completable.fromRunnable(() -> {
Callback[] callbacks = ctx.callbacks();
for (Callback callback : callbacks) {
if (callback instanceof OAuthBearerValidatorCallback oauthCallback) {
String extractedToken = ctx.getAttribute(CONTEXT_ATTRIBUTE_TOKEN);
String user = ctx.getAttribute(ATTR_USER);
Date expirationTime = jwtClaimsSet.getExpirationTime();
Date issueTime = jwtClaimsSet.getIssueTime();

private void setAuthContextInfos(HttpPlainExecutionContext ctx, LazyJWT jwt, JWTClaimsSet claims) {
final HttpPlainRequest request = ctx.request();
OAuthBearerToken token = new BasicOAuthBearerToken(
extractedToken,
Set.of(), // Scopes are fully managed by Gravitee, it is useless to extract & provide them to the Kafka security context.
(expirationTime == null ? Long.MAX_VALUE : expirationTime.getTime()),
user,
(issueTime == null ? null : issueTime.getTime())
);

// 3_ Set access_token in context
ctx.setAttribute(CONTEXT_ATTRIBUTE_TOKEN, jwt.getToken());
oauthCallback.token(token);
}
}
})
);
}

String clientId = getClientId(claims);
ctx.setAttribute(CONTEXT_ATTRIBUTE_OAUTH_CLIENT_ID, clientId);
private Maybe<SecurityToken> getSecurityTokenFromContext(BaseExecutionContext ctx) {
LazyJWT jwtToken = ctx.getAttribute(CONTEXT_ATTRIBUTE_JWT);

final String user;
if (configuration.getUserClaim() != null && !configuration.getUserClaim().isEmpty()) {
user = (String) claims.getClaim(configuration.getUserClaim());
} else {
user = claims.getSubject();
if (jwtToken == null) {
jwtToken = TokenExtractor.extract(ctx).map(LazyJWT::new).orElse(null);
}
ctx.setAttribute(ATTR_USER, user);
Metrics metrics = ctx.metrics();
metrics.setUser(user);
metrics.setSecurityType(JWT);
metrics.setSecurityToken(clientId);

if (configuration.isExtractClaims()) {
ctx.setAttribute(CONTEXT_ATTRIBUTE_JWT_CLAIMS, claims.getClaims());
if (jwtToken != null) {
ctx.setAttribute(CONTEXT_ATTRIBUTE_JWT, jwtToken);
String clientId = getClientId(jwtToken);
if (clientId != null) {
return Maybe.just(SecurityToken.forClientId(clientId));
}
return Maybe.just(SecurityToken.invalid(SecurityToken.TokenType.CLIENT_ID));
}

if (!configuration.isPropagateAuthHeader()) {
request.headers().remove(HttpHeaders.AUTHORIZATION);
return Maybe.empty();
}

private String getClientId(LazyJWT jwtToken) {
try {
JWT jwt = jwtToken.getDelegate();
if (jwt != null) {
return getClientId(jwt.getJWTClaimsSet());
}
} catch (ParseException e) {
log.error("Failed to parse JWT claim set while looking for clientId", e);
}
return null;
}

private Maybe<LazyJWT> extractToken(HttpPlainExecutionContext ctx) {
Optional<String> token = TokenExtractor.extract(ctx.request());
private Single<JWTClaimsSet> handleSecurity(final BaseExecutionContext ctx) {
return fetchJWTToken(ctx).flatMap(jwt -> validateToken(ctx, jwt).doOnSuccess(claims -> setAuthContextInfos(ctx, jwt, claims)));
}

private Single<LazyJWT> fetchJWTToken(BaseExecutionContext ctx) {
Optional<String> token = TokenExtractor.extract(ctx);
if (token.isEmpty()) {
return interrupt401AsMaybe(ctx, JWT_MISSING_TOKEN_KEY);
return interruptUnauthorized(ctx, JWT_MISSING_TOKEN_KEY);
}
String tokenValue = token.get();
if (tokenValue.isEmpty()) {
return interrupt401AsMaybe(ctx, JWT_INVALID_TOKEN_KEY);
return interruptUnauthorized(ctx, JWT_INVALID_TOKEN_KEY);
}
return Maybe.just(new LazyJWT(token.get()));
return Single.just(new LazyJWT(token.get()));
}

private Single<JWTClaimsSet> validateToken(HttpPlainExecutionContext ctx, LazyJWT jwt) {
private Single<JWTClaimsSet> validateToken(BaseExecutionContext ctx, LazyJWT jwt) {
return jwtProcessorResolver
.provide(ctx)
.flatMapSingle(jwtProcessor -> {
Expand All @@ -169,76 +202,92 @@ private Single<JWTClaimsSet> validateToken(HttpPlainExecutionContext ctx, LazyJW
jwtClaimsSet = jwtProcessor.process(jwt.getDelegate(), null);
} catch (Exception exception) {
reportError(ctx, exception);
return interrupt401AsSingle(ctx, JWT_INVALID_TOKEN_KEY);
return interruptUnauthorized(ctx, JWT_INVALID_TOKEN_KEY);
}

// Validate confirmation method
JWTPolicyConfiguration.ConfirmationMethodValidation confirmationMethodValidation =
configuration.getConfirmationMethodValidation();
if (confirmationMethodValidation != null && confirmationMethodValidation.getCertificateBoundThumbprint().isEnabled()) {
if (
!isValidCertificateThumbprint(
jwtClaimsSet,
ctx.request().tlsSession(),
ctx.request().headers(),
confirmationMethodValidation.isIgnoreMissing(),
confirmationMethodValidation.getCertificateBoundThumbprint()
)
) {
return interrupt401AsSingle(ctx, JWT_INVALID_CERTIFICATE_BOUND_THUMBPRINT);
// FIXME: Kafka Gateway - https://gravitee.atlassian.net/browse/APIM-7523
if (ctx instanceof HttpPlainExecutionContext httpPlainExecutionContext) {
// Validate confirmation method
JWTPolicyConfiguration.ConfirmationMethodValidation confirmationMethodValidation =
configuration.getConfirmationMethodValidation();
if (confirmationMethodValidation != null && confirmationMethodValidation.getCertificateBoundThumbprint().isEnabled()) {
if (
!isValidCertificateThumbprint(
jwtClaimsSet,
httpPlainExecutionContext.request().tlsSession(),
httpPlainExecutionContext.request().headers(),
confirmationMethodValidation.isIgnoreMissing(),
confirmationMethodValidation.getCertificateBoundThumbprint()
)
) {
return interruptUnauthorized(httpPlainExecutionContext, JWT_INVALID_CERTIFICATE_BOUND_THUMBPRINT);
}
}
}
return Single.just(jwtClaimsSet);
})
.toSingle();
}

private <T> Maybe<T> interrupt401AsMaybe(HttpPlainExecutionContext ctx, String key) {
return interrupt401(ctx, key).toMaybe();
}
private void setAuthContextInfos(BaseExecutionContext ctx, LazyJWT jwt, JWTClaimsSet claims) {
// 3_ Set access_token in context
ctx.setAttribute(CONTEXT_ATTRIBUTE_TOKEN, jwt.getToken());

String clientId = getClientId(claims);
ctx.setAttribute(CONTEXT_ATTRIBUTE_OAUTH_CLIENT_ID, clientId);

final String user;
if (configuration.getUserClaim() != null && !configuration.getUserClaim().isEmpty()) {
user = (String) claims.getClaim(configuration.getUserClaim());
} else {
user = claims.getSubject();
}
ctx.setAttribute(ATTR_USER, user);
Metrics metrics = ctx.metrics();
metrics.setUser(user);
metrics.setSecurityType(JWT);
metrics.setSecurityToken(clientId);

private <T> Single<T> interrupt401AsSingle(HttpPlainExecutionContext ctx, String key) {
return interrupt401(ctx, key).<T>toMaybe().toSingle();
if (configuration.isExtractClaims()) {
ctx.setAttribute(CONTEXT_ATTRIBUTE_JWT_CLAIMS, claims.getClaims());
}
}

private Completable interrupt401(HttpPlainExecutionContext ctx, String key) {
return ctx.interruptWith(new ExecutionFailure(UNAUTHORIZED_401).key(key).message(UNAUTHORIZED_MESSAGE));
private <T> Single<T> interruptUnauthorized(BaseExecutionContext ctx, String key) {
if (ctx instanceof HttpPlainExecutionContext httpPlainExecutionContext) {
return httpPlainExecutionContext
.interruptWith(new ExecutionFailure(UNAUTHORIZED_401).key(key).message(UNAUTHORIZED_MESSAGE))
.<T>toMaybe()
.toSingle();
}
// FIXME: Kafka Gateway - manage interruption with Kafka.
return Single.error(new Exception(key));
}

private void reportError(HttpPlainExecutionContext ctx, Throwable throwable) {
private void reportError(BaseExecutionContext ctx, Throwable throwable) {
if (throwable != null) {
final HttpPlainRequest request = ctx.request();
ctx.metrics().setErrorMessage(throwable.getMessage());

if (log.isDebugEnabled()) {
try {
final String api = ctx.getAttribute(ATTR_API);
MDC.put("api", api);

log.debug(
"[api-id:{}] [request-id:{}] [request-path:{}] {}",
api,
request.id(),
request.path(),
throwable.getMessage(),
throwable
);
} finally {
MDC.remove("api");
}
}
}
}
if (ctx instanceof HttpPlainExecutionContext httpPlainExecutionContext) {
try {
final HttpPlainRequest request = httpPlainExecutionContext.request();
final String api = ctx.getAttribute(ATTR_API);
MDC.put("api", api);

private String getClientId(LazyJWT jwtToken) {
try {
JWT jwt = jwtToken.getDelegate();
if (jwt != null) {
return getClientId(jwt.getJWTClaimsSet());
log.debug(
"[api-id:{}] [request-id:{}] [request-path:{}] {}",
api,
request.id(),
request.path(),
throwable.getMessage(),
throwable
);
} finally {
MDC.remove("api");
}
}
}
} catch (ParseException e) {
log.error("Failed to parse JWT claim set while looking for clientId", e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ public Single<Resource> retrieve(String url) {

HttpClient httpClient = buildHttpClient(finalURL);

final RequestOptions requestOptions = new RequestOptions().setMethod(HttpMethod.GET).setAbsoluteURI(url).setTimeout(requestTimeout);
final RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.GET)
.setAbsoluteURI(finalURL)
.setTimeout(requestTimeout);

return httpClient
.rxRequest(requestOptions)
Expand Down
Loading

0 comments on commit f1db2f1

Please sign in to comment.