Skip to content

Commit

Permalink
[fix][authentication] Store the original authentication data (#19519)
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <[email protected]>
  • Loading branch information
nodece authored Feb 16, 2023
1 parent c0f89dc commit 2d90089
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -737,31 +737,32 @@ public void authChallengeSuccessCallback(AuthData authChallenge,
// 2. an authentication refresh, in which case we need to refresh authenticationData
AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState;
String newAuthRole = authState.getAuthRole();

// Refresh the auth data.
this.authenticationData = authState.getAuthDataSource();
if (log.isDebugEnabled()) {
log.debug("[{}] Auth data refreshed for role={}", remoteAddress, this.authRole);
}

if (!useOriginalAuthState) {
this.authRole = newAuthRole;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}
AuthenticationDataSource newAuthDataSource = authState.getAuthDataSource();

if (state != State.Connected) {
// Set the auth data and auth role
if (!useOriginalAuthState) {
this.authRole = newAuthRole;
this.authenticationData = newAuthDataSource;
}
// First time authentication is done
if (originalAuthState != null) {
// We only set originalAuthState when we are going to use it.
authenticateOriginalData(clientProtocolVersion, clientVersion);
} else {
completeConnect(clientProtocolVersion, clientVersion);
if (log.isDebugEnabled()) {
log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}",
remoteAddress, authMethod, this.authRole, originalPrincipal);
}
}
} else {
// Refresh the auth data
if (!useOriginalAuthState) {
this.authenticationData = newAuthDataSource;
} else {
this.originalAuthData = newAuthDataSource;
}
// If the connection was already ready, it means we're doing a refresh
if (!StringUtils.isEmpty(authRole)) {
if (!authRole.equals(newAuthRole)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,15 @@
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

import javax.naming.AuthenticationException;
import java.util.concurrent.CompletableFuture;

import static java.nio.charset.StandardCharsets.UTF_8;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;

/**
* Class to use when verifying the behavior around expired authentication data because it will always return
* true when isExpired is called.
*/
public class MockAlwaysExpiredAuthenticationState implements AuthenticationState {
final MockAlwaysExpiredAuthenticationProvider provider;
AuthenticationDataSource authenticationDataSource;
volatile String authRole;

MockAlwaysExpiredAuthenticationState(MockAlwaysExpiredAuthenticationProvider provider) {
this.provider = provider;
}


@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first.");
}
return authRole;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
return null;
}

/**
* This authentication is always single stage, so it returns immediately
*/
@Override
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
return provider
.authenticateAsync(authenticationDataSource)
.thenApply(role -> {
authRole = role;
return null;
});
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return true;
public class MockAlwaysExpiredAuthenticationState extends MockMutableAuthenticationState {
MockAlwaysExpiredAuthenticationState(AuthenticationProvider provider) {
super(provider);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;
import javax.net.ssl.SSLSession;
import java.net.SocketAddress;

public class MockMutableAuthenticationProvider extends MockAuthenticationProvider {
public AuthenticationState newAuthState(AuthData authData,
SocketAddress remoteAddress,
SSLSession sslSession) {
return new MockMutableAuthenticationState(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.auth;

import static java.nio.charset.StandardCharsets.UTF_8;
import javax.naming.AuthenticationException;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.common.api.AuthData;

// MockMutableAuthenticationState always update the authentication data source and auth role.
public class MockMutableAuthenticationState implements AuthenticationState {
final AuthenticationProvider provider;
AuthenticationDataSource authenticationDataSource;
volatile String authRole;

MockMutableAuthenticationState(AuthenticationProvider provider) {
this.provider = provider;
}

@Override
public String getAuthRole() throws AuthenticationException {
if (authRole == null) {
throw new AuthenticationException("Must authenticate first.");
}
return authRole;
}

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
return null;
}

/**
* This authentication is always single stage, so it returns immediately
*/
@Override
public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
authenticationDataSource = new AuthenticationDataCommand(new String(authData.getBytes(), UTF_8));
return provider
.authenticateAsync(authenticationDataSource)
.thenApply(role -> {
authRole = role;
return null;
});
}

@Override
public AuthenticationDataSource getAuthDataSource() {
return authenticationDataSource;
}

@Override
public boolean isComplete() {
return true;
}

@Override
public boolean isExpired() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockAlwaysExpiredAuthenticationProvider;
import org.apache.pulsar.broker.auth.MockMutableAuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
Expand Down Expand Up @@ -1030,6 +1031,54 @@ public void testVerifyAuthRoleAndAuthDataFromDirectConnectionBroker() throws Exc
}));
}

@Test
public void testRefreshOriginalPrincipalWithAuthDataForwardedFromProxy() throws Exception {
AuthenticationService authenticationService = mock(AuthenticationService.class);
AuthenticationProvider authenticationProvider = new MockMutableAuthenticationProvider();
String authMethodName = authenticationProvider.getAuthMethodName();
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
svcConfig.setAuthenticationEnabled(true);
svcConfig.setAuthenticateOriginalAuthData(true);
svcConfig.setProxyRoles(Collections.singleton("pass.proxy"));

resetChannel();
assertTrue(channel.isActive());
assertEquals(serverCnx.getState(), State.Start);

String proxyRole = "pass.proxy";
String clientRole = "pass.client";
ByteBuf connect = Commands.newConnect(authMethodName, proxyRole, "test", "localhost",
clientRole, clientRole, authMethodName);
channel.writeInbound(connect);
Object connectResponse = getResponse();
assertTrue(connectResponse instanceof CommandConnected);
assertEquals(serverCnx.getOriginalAuthData().getCommandData(), clientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), clientRole);
assertEquals(serverCnx.getOriginalPrincipal(), clientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

// Request refreshing the original auth.
// Expected:
// 1. Original role and original data equals to "pass.RefreshOriginAuthData".
// 2. The broker disconnects the client, because the new role doesn't equal the old role.
String newClientRole = "pass.RefreshOriginAuthData";
ByteBuf refreshAuth = Commands.newAuthResponse(authMethodName,
AuthData.of(newClientRole.getBytes(StandardCharsets.UTF_8)), 0, "test");
channel.writeInbound(refreshAuth);

assertEquals(serverCnx.getOriginalAuthData().getCommandData(), newClientRole);
assertEquals(serverCnx.getOriginalAuthState().getAuthRole(), newClientRole);
assertEquals(serverCnx.getAuthData().getCommandData(), proxyRole);
assertEquals(serverCnx.getAuthRole(), proxyRole);
assertEquals(serverCnx.getAuthState().getAuthRole(), proxyRole);

assertFalse(channel.isOpen());
assertFalse(channel.isActive());
}

@Test(timeOut = 30000)
public void testProducerCommand() throws Exception {
resetChannel();
Expand Down

0 comments on commit 2d90089

Please sign in to comment.