Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIP-30: interface and mutual change authentication #3677

Merged
merged 13 commits into from
Mar 13, 2019
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

/**
* Provider of authentication mechanism
Expand Down Expand Up @@ -61,7 +60,7 @@ public interface AuthenticationProvider extends Closeable {
* if the credentials are not valid
*/
default String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
throw new UnsupportedOperationException();
throw new AuthenticationException("Not supported");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.pulsar.broker.authentication;

import java.io.IOException;
import javax.naming.AuthenticationException;
import org.apache.pulsar.common.api.AuthData;

Expand All @@ -40,7 +39,7 @@ public interface AuthenticationState {
/**
* Challenge passed in auth data and get response data.
*/
AuthData authenticate(AuthData authData) throws IOException, AuthenticationException;
AuthData authenticate(AuthData authData) throws AuthenticationException;

/**
* Return AuthenticationDataSource.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import lombok.Getter;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
Expand All @@ -41,7 +40,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Test Mutual Authentication.
Expand All @@ -59,28 +58,30 @@ public class MutualAuthenticationTest extends ProducerConsumerBase {

private static String[] serverAuthStrings = {
"ResponseMutualClientAuthInit", // step 0
"ResponseMutualClientStep1" // step 1
};

public static class MutualAuthenticationDataProvider implements AuthenticationDataProvider {
private int authStep = 0;

@Override
public boolean hasDataFromCommand() {
return true;
}

@Override
public AuthData authenticate(AuthData data) throws AuthenticationException {
String dataString = new String(data.getBytes(), Charset.forName("UTF-8"));
if (!dataString.equalsIgnoreCase("init")) {
if (!dataString.equals(serverAuthStrings[authStep - 1])) {
throw new AuthenticationException();
}
String dataString = new String(data.getBytes(), UTF_8);
AuthData toSend;

if (Arrays.equals(dataString.getBytes(), AuthData.INIT_AUTH_DATA)) {
toSend = AuthData.of(clientAuthStrings[0].getBytes(UTF_8));
} else if (Arrays.equals(dataString.getBytes(), serverAuthStrings[0].getBytes(UTF_8))) {
toSend = AuthData.of(clientAuthStrings[1].getBytes(UTF_8));
} else {
throw new AuthenticationException();
}
log.debug("authenticate in client. passed in :{}, send: clientAuthStrings[{}]: {}",
dataString, authStep, clientAuthStrings[authStep]);
return AuthData.of(clientAuthStrings[authStep ++].getBytes());

log.debug("authenticate in client. passed in :{}, send: {}",
dataString, new String(toSend.getBytes(), UTF_8));
return toSend;
}
}

Expand Down Expand Up @@ -117,7 +118,7 @@ public void start() throws PulsarClientException {


public static class MutualAuthenticationState implements AuthenticationState {
private int authStep = 0;
private boolean isComplete = false;

@Override
public String getAuthRole() throws AuthenticationException {
Expand All @@ -126,14 +127,21 @@ public String getAuthRole() throws AuthenticationException {

@Override
public AuthData authenticate(AuthData authData) throws AuthenticationException {
String clientData = new String(authData.getBytes(), Charset.forName("UTF-8"));
if (!clientData.equals(clientAuthStrings[authStep])) {
String dataString = new String(authData.getBytes(), UTF_8);
AuthData toSend;

if (Arrays.equals(dataString.getBytes(), clientAuthStrings[0].getBytes(UTF_8))) {
toSend = AuthData.of(serverAuthStrings[0].getBytes(UTF_8));
} else if (Arrays.equals(dataString.getBytes(), clientAuthStrings[1].getBytes(UTF_8))) {
isComplete = true;
toSend = AuthData.of(null);
} else {
throw new AuthenticationException();
}
log.debug("authenticate in server. passed in :{}, send: serverAuthStrings[{}]: {}",
clientData, authStep, serverAuthStrings[authStep]);
AuthData serverData = AuthData.of(serverAuthStrings[authStep ++].getBytes());
return serverData;

log.debug("authenticate in server. passed in :{}, send: {}",
dataString, toSend.getBytes() == null ? "null" : new String(toSend.getBytes(), UTF_8));
return toSend;
}

@Override
Expand All @@ -143,7 +151,7 @@ public AuthenticationDataSource getAuthDataSource() {

@Override
public boolean isComplete() {
return authStep > 1;
return isComplete;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import javax.naming.AuthenticationException;
import org.apache.pulsar.common.api.AuthData;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Interface for accessing data which are used in variety of authentication schemes on client side
*/
Expand Down Expand Up @@ -118,7 +120,7 @@ default String getCommandData() {
* Mainly used for mutual authentication like sasl.
*/
default AuthData authenticate(AuthData data) throws IOException, AuthenticationException {
byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes("UTF-8");
byte[] bytes = (hasDataFromCommand() ? this.getCommandData() : "").getBytes(UTF_8);
return AuthData.of(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@

import lombok.Data;

import static java.nio.charset.StandardCharsets.UTF_8;

@Data(staticConstructor="of")
public final class AuthData {
public static byte[] INIT_AUTH_DATA = "init".getBytes(UTF_8);
jiazhai marked this conversation as resolved.
Show resolved Hide resolved

private final byte[] bytes;

public boolean isComplete() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ protected ByteBuf newConnectCommand() throws Exception {
// each channel will have a mutual client/server pair, mutual client evaluateChallenge with init data,
// and return authData to server.
authenticationDataProvider = authentication.getAuthData(remoteHostName);
AuthData authData = authenticationDataProvider.authenticate(AuthData.of("init".getBytes("UTF-8")));
AuthData authData = authenticationDataProvider.authenticate(AuthData.of(AuthData.INIT_AUTH_DATA));
return Commands.newConnect(authentication.getAuthMethodName(), authData, this.protocolVersion,
getPulsarClientVersion(), proxyToTargetBrokerAddress, null, null, null);
}
Expand Down