From 13d3b36cb75690f3f2c5856d8e02dc98ec787beb Mon Sep 17 00:00:00 2001 From: zhao liwei Date: Fri, 6 Nov 2020 10:38:32 +0800 Subject: [PATCH] feat(security): implement AuthProtocol and Credential classes (#139) --- configuration/pegasus.properties | 7 +- scripts/format-all.sh | 1 + .../infra/pegasus/client/ClientOptions.java | 162 ++++++++---------- .../ReplicaSessionInterceptorManager.java | 8 +- .../SecurityReplicaSessionInterceptor.java | 62 ------- .../infra/pegasus/security/AuthProtocol.java | 27 +++ .../AuthReplicaSessionInterceptor.java | 35 ++++ .../infra/pegasus/security/Credential.java | 45 +++++ .../pegasus/security/KerberosCredential.java | 156 +++++++++++++++++ .../pegasus/security/KerberosProtocol.java | 105 ++++++++++++ .../{rpc/async => security}/Negotiation.java | 25 ++- .../async => security}/NegotiationTest.java | 3 +- 12 files changed, 465 insertions(+), 171 deletions(-) delete mode 100644 src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/security/AuthProtocol.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/security/AuthReplicaSessionInterceptor.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/security/Credential.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/security/KerberosCredential.java create mode 100644 src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java rename src/main/java/com/xiaomi/infra/pegasus/{rpc/async => security}/Negotiation.java (76%) rename src/test/java/com/xiaomi/infra/pegasus/{rpc/async => security}/NegotiationTest.java (93%) diff --git a/configuration/pegasus.properties b/configuration/pegasus.properties index 29dbe772..c413d5d6 100644 --- a/configuration/pegasus.properties +++ b/configuration/pegasus.properties @@ -5,5 +5,8 @@ enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 meta_query_timeout = 5000 -service_name = "" -service_fqdn = "" +auth_protocol = +kerberos_service_name = +kerberos_service_fqdn = +kerberos_keytab = +kerberos_principal = diff --git a/scripts/format-all.sh b/scripts/format-all.sh index 38546458..94c9ed2a 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -33,6 +33,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java src/main/java/com/xiaomi/infra/pegasus/client/request/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java + src/main/java/com/xiaomi/infra/pegasus/security/*.java src/test/java/com/xiaomi/infra/pegasus/client/*.java src/test/java/com/xiaomi/infra/pegasus/metrics/*.java src/test/java/com/xiaomi/infra/pegasus/rpc/async/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index be9e6b23..2494eaed 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -5,6 +5,7 @@ import static com.xiaomi.infra.pegasus.client.PConfigUtil.loadConfiguration; +import com.xiaomi.infra.pegasus.security.Credential; import java.time.Duration; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ConfigurationConverter; @@ -30,7 +31,8 @@ * .falconPerfCounterTags("") * .falconPushInterval(Duration.ofSeconds(10)) * .metaQueryTimeout(Duration.ofMillis(5000)) - * .enableAuth(false) + * .authProtocol("") + * .credential(null) * .build(); * } */ @@ -45,9 +47,7 @@ public class ClientOptions { public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags"; public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; - public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth"; - public static final String PEGASUS_SERVICE_NAME_KEY = "service_name"; - public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn"; + public static final String PEGASUS_AUTH_PROTOCOL_KEY = "auth_protocol"; public static final String DEFAULT_META_SERVERS = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; @@ -58,9 +58,7 @@ public class ClientOptions { public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); - public static final boolean DEFAULT_ENABLE_AUTH = false; - public static final String DEFAULT_SERVICE_NAME = ""; - public static final String DEFAULT_SERVICE_FQDN = ""; + public static final String DEFAULT_AUTH_PROTOCOL = ""; private final String metaServers; private final Duration operationTimeout; @@ -70,9 +68,8 @@ public class ClientOptions { private final Duration falconPushInterval; private final boolean enableWriteLimit; private final Duration metaQueryTimeout; - private final boolean enableAuth; - private final String serviceName; - private final String serviceFQDN; + private final String authProtocol; + private final Credential credential; protected ClientOptions(Builder builder) { this.metaServers = builder.metaServers; @@ -83,9 +80,8 @@ protected ClientOptions(Builder builder) { this.falconPushInterval = builder.falconPushInterval; this.enableWriteLimit = builder.enableWriteLimit; this.metaQueryTimeout = builder.metaQueryTimeout; - this.enableAuth = builder.enableAuth; - this.serviceName = builder.serviceName; - this.serviceFQDN = builder.serviceFQDN; + this.authProtocol = builder.authProtocol; + this.credential = builder.credential; } protected ClientOptions(ClientOptions original) { @@ -97,9 +93,8 @@ protected ClientOptions(ClientOptions original) { this.falconPushInterval = original.getFalconPushInterval(); this.enableWriteLimit = original.isWriteLimitEnabled(); this.metaQueryTimeout = original.getMetaQueryTimeout(); - this.enableAuth = original.isEnableAuth(); - this.serviceName = original.getServiceName(); - this.serviceFQDN = original.getServiceFQDN(); + this.authProtocol = original.getAuthProtocol(); + this.credential = original.getCredential(); } /** @@ -159,9 +154,8 @@ public static ClientOptions create(String configPath) throws PException { Duration metaQueryTimeout = Duration.ofMillis( config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis())); - boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH); - String serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME); - String serviceFQDN = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN); + String authProtocol = config.getString(PEGASUS_AUTH_PROTOCOL_KEY, DEFAULT_AUTH_PROTOCOL); + Credential credential = Credential.create(authProtocol, config); return ClientOptions.builder() .metaServers(metaList) @@ -171,9 +165,8 @@ public static ClientOptions create(String configPath) throws PException { .falconPerfCounterTags(perfCounterTags) .falconPushInterval(pushIntervalSecs) .metaQueryTimeout(metaQueryTimeout) - .enableAuth(enableAuth) - .serviceName(serviceName) - .serviceFQDN(serviceFQDN) + .authProtocol(authProtocol) + .credential(credential) .build(); } @@ -192,41 +185,40 @@ public boolean equals(Object options) { && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() && this.enableWriteLimit == clientOptions.enableWriteLimit && this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis() - && this.enableAuth == clientOptions.enableAuth - && this.serviceName == clientOptions.serviceName - && this.serviceFQDN == clientOptions.serviceFQDN; + && this.authProtocol == clientOptions.authProtocol + && this.credential == clientOptions.credential; } return false; } @Override public String toString() { - return "ClientOptions{" - + "metaServers='" - + metaServers - + '\'' - + ", operationTimeout(ms)=" - + operationTimeout.toMillis() - + ", asyncWorkers=" - + asyncWorkers - + ", enablePerfCounter=" - + enablePerfCounter - + ", falconPerfCounterTags='" - + falconPerfCounterTags - + '\'' - + ", falconPushInterval(s)=" - + falconPushInterval.getSeconds() - + ",enableWriteLimit=" - + enableWriteLimit - + ", metaQueryTimeout(ms)=" - + metaQueryTimeout.toMillis() - + ", enableAuth=" - + enableAuth - + ", serviceName=" - + serviceName - + ", serviceFQDN=" - + serviceFQDN - + '}'; + String res = + "ClientOptions{" + + "metaServers='" + + metaServers + + '\'' + + ", operationTimeout(ms)=" + + operationTimeout.toMillis() + + ", asyncWorkers=" + + asyncWorkers + + ", enablePerfCounter=" + + enablePerfCounter + + ", falconPerfCounterTags='" + + falconPerfCounterTags + + '\'' + + ", falconPushInterval(s)=" + + falconPushInterval.getSeconds() + + ",enableWriteLimit=" + + enableWriteLimit + + ", metaQueryTimeout(ms)=" + + metaQueryTimeout.toMillis() + + ", authProtocol=" + + authProtocol; + if (credential != null) { + res += ", credential=" + credential.toString(); + } + return res + '}'; } /** Builder for {@link ClientOptions}. */ @@ -239,9 +231,8 @@ public static class Builder { private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL; private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT; private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT; - private boolean enableAuth = DEFAULT_ENABLE_AUTH; - private String serviceName = DEFAULT_SERVICE_NAME; - private String serviceFQDN = DEFAULT_SERVICE_FQDN; + private String authProtocol = DEFAULT_AUTH_PROTOCOL; + private Credential credential = null; protected Builder() {} @@ -345,37 +336,28 @@ public Builder metaQueryTimeout(Duration metaQueryTimeout) { } /** - * Whether to enable authentication. Defaults to {@literal false}, see {@link - * #DEFAULT_ENABLE_AUTH}. + * The authentiation protocol to use. Available protocols are: 1. kerberos; 2."" * - * @param enableAuth - * @return {@code this} - */ - public Builder enableAuth(boolean enableAuth) { - this.enableAuth = enableAuth; - return this; - } - - /** - * service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}. + *

"" means the authentiation is disabled * - * @param serviceName + *

Defaults to {@literal ""}, See {@link #DEFAULT_AUTH_PROTOCOL} + * + * @param authProtocol authentiation protocol. * @return {@code this} */ - public Builder serviceName(String serviceName) { - this.serviceName = serviceName; + public Builder authProtocol(String authProtocol) { + this.authProtocol = authProtocol; return this; } /** - * service full qualified domain name. Defaults to {@literal ""}, see {@link - * #DEFAULT_SERVICE_FQDN}. + * credential info. Defaults to {@literal null} * - * @param serviceFQDN + * @param credential credential * @return {@code this} */ - public Builder serviceFQDN(String serviceFQDN) { - this.serviceFQDN = serviceFQDN; + public Builder credential(Credential credential) { + this.credential = credential; return this; } @@ -407,9 +389,8 @@ public ClientOptions.Builder mutate() { .falconPushInterval(getFalconPushInterval()) .enableWriteLimit(isWriteLimitEnabled()) .metaQueryTimeout(getMetaQueryTimeout()) - .enableAuth(isEnableAuth()) - .serviceName(getServiceName()) - .serviceFQDN(getServiceFQDN()); + .authProtocol(getAuthProtocol()) + .credential(getCredential()); return builder; } @@ -491,29 +472,24 @@ public Duration getMetaQueryTimeout() { } /** - * Whether to enable authentication. Defaults to {@literal false}. + * The authentiation protocol to use. Available protocols are: 1. kerberos; 2."" * - * @return whether to enable authentication. - */ - public boolean isEnableAuth() { - return enableAuth; - } - - /** - * service name. Defaults to {@literal ""}. + *

"" means the authentiation is disabled + * + *

Defaults to {@literal ""}, See {@link #DEFAULT_AUTH_PROTOCOL} * - * @return service name. + * @return authentiation protocol. */ - public String getServiceName() { - return serviceName; + public String getAuthProtocol() { + return authProtocol; } /** - * service full qualified domain name. Defaults to {@literal ""}. + * credential info. Defaults to {@literal null} * - * @return service full qualified domain name. + * @return credential */ - public String getServiceFQDN() { - return serviceFQDN; + public Credential getCredential() { + return credential; } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java index 5c11c6ae..86bb0d6a 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java @@ -20,6 +20,7 @@ import com.xiaomi.infra.pegasus.client.ClientOptions; import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; +import com.xiaomi.infra.pegasus.security.AuthReplicaSessionInterceptor; import java.util.ArrayList; import java.util.List; @@ -27,10 +28,9 @@ public class ReplicaSessionInterceptorManager { private List interceptors = new ArrayList<>(); public ReplicaSessionInterceptorManager(ClientOptions options) { - if (options.isEnableAuth()) { - ReplicaSessionInterceptor securityInterceptor = - new SecurityReplicaSessionInterceptor(options.getServiceName(), options.getServiceFQDN()); - interceptors.add(securityInterceptor); + if (!options.getAuthProtocol().isEmpty()) { + ReplicaSessionInterceptor authInterceptor = new AuthReplicaSessionInterceptor(options); + interceptors.add(authInterceptor); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java deleted file mode 100644 index 2a5df7b5..00000000 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/SecurityReplicaSessionInterceptor.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.xiaomi.infra.pegasus.rpc.interceptor; - -import com.sun.security.auth.callback.TextCallbackHandler; -import com.xiaomi.infra.pegasus.rpc.async.Negotiation; -import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; -import javax.security.auth.Subject; -import javax.security.auth.login.LoginContext; -import javax.security.auth.login.LoginException; -import org.slf4j.Logger; - -public class SecurityReplicaSessionInterceptor implements ReplicaSessionInterceptor { - private static final Logger logger = - org.slf4j.LoggerFactory.getLogger(SecurityReplicaSessionInterceptor.class); - - private String serviceName; - private String serviceFqdn; - private Subject subject; - private LoginContext loginContext; - - public SecurityReplicaSessionInterceptor(String serviceName, String serviceFqdn) - throws IllegalArgumentException { - this.serviceName = serviceName; - this.serviceFqdn = serviceFqdn; - - try { - loginContext = new LoginContext("client", new TextCallbackHandler()); - loginContext.login(); - - subject = loginContext.getSubject(); - if (subject == null) { - throw new LoginException("subject is null"); - } - } catch (LoginException le) { - throw new IllegalArgumentException("login failed", le); - } - - logger.info("login succeed, as user {}", subject.getPrincipals().toString()); - } - - public void onConnected(ReplicaSession session) { - Negotiation negotiation = new Negotiation(session, subject, serviceName, serviceFqdn); - negotiation.start(); - } -} diff --git a/src/main/java/com/xiaomi/infra/pegasus/security/AuthProtocol.java b/src/main/java/com/xiaomi/infra/pegasus/security/AuthProtocol.java new file mode 100644 index 00000000..b309df29 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/security/AuthProtocol.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.xiaomi.infra.pegasus.security; + +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; + +/** authentiation protocol */ +public interface AuthProtocol { + /** start the authentiate process */ + void authenticate(ReplicaSession session); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/security/AuthReplicaSessionInterceptor.java b/src/main/java/com/xiaomi/infra/pegasus/security/AuthReplicaSessionInterceptor.java new file mode 100644 index 00000000..877c7c9d --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/security/AuthReplicaSessionInterceptor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.xiaomi.infra.pegasus.security; + +import com.xiaomi.infra.pegasus.client.ClientOptions; +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; +import com.xiaomi.infra.pegasus.rpc.interceptor.ReplicaSessionInterceptor; + +public class AuthReplicaSessionInterceptor implements ReplicaSessionInterceptor { + private AuthProtocol protocol; + + public AuthReplicaSessionInterceptor(ClientOptions options) throws IllegalArgumentException { + this.protocol = options.getCredential().getProtocol(); + } + + public void onConnected(ReplicaSession session) { + protocol.authenticate(session); + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/security/Credential.java b/src/main/java/com/xiaomi/infra/pegasus/security/Credential.java new file mode 100644 index 00000000..7ea3a5f9 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/security/Credential.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.xiaomi.infra.pegasus.security; + +import org.apache.commons.configuration2.Configuration; + +/** credential info for authentiation */ +public interface Credential { + String KERBEROS_PROTOCOL_NAME = "kerberos"; + + static Credential create(String authProtocol, Configuration config) + throws IllegalArgumentException { + Credential credential; + if (authProtocol.equals(KERBEROS_PROTOCOL_NAME)) { + credential = new KerberosCredential(config); + } else if (authProtocol.isEmpty()) { + credential = null; + } else { + throw new IllegalArgumentException("unsupported protocol: " + authProtocol); + } + + return credential; + } + + /** get the authentiation protocol supported */ + AuthProtocol getProtocol(); + + String toString(); +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/security/KerberosCredential.java b/src/main/java/com/xiaomi/infra/pegasus/security/KerberosCredential.java new file mode 100644 index 00000000..14d1036a --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/security/KerberosCredential.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.xiaomi.infra.pegasus.security; + +import org.apache.commons.configuration2.Configuration; + +/** + * kerberos credential information + * + *

To create a new instance with Configuration: + * + *

{@code
+ * new KerberosCredential(config);
+ * }
+ * + * To customize the settings: + * + *
{@code
+ * KerberosCredential credential =
+ *      KerberosCredential.Builder()
+ *          .serviceName("")
+ *          .serviceFqdn("")
+ *          .keytab("")
+ *          .principal("")
+ *          .build();
+ * }
+ */ +class KerberosCredential implements Credential { + public static final String PEGASUS_SERVICE_NAME_KEY = "kerberos_service_name"; + public static final String PEGASUS_SERVICE_FQDN_KEY = "kerberos_service_fqdn"; + public static final String PEGASUS_KEYTAB_KEY = "kerberos_keytab"; + public static final String PEGASUS_PRINCIPAL_KEY = "kerberos_principal"; + + public static final String DEFAULT_SERVICE_NAME = ""; + public static final String DEFAULT_SERVICE_FQDN = ""; + public static final String DEFAULT_KEYTAB = ""; + public static final String DEFAULT_PRINCIPAL = ""; + + private String serviceName; + private String serviceFqdn; + private String keyTab; + private String principal; + + KerberosCredential(Configuration config) { + this.serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME); + this.serviceFqdn = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN); + this.keyTab = config.getString(PEGASUS_KEYTAB_KEY, DEFAULT_KEYTAB); + this.principal = config.getString(PEGASUS_PRINCIPAL_KEY, DEFAULT_PRINCIPAL); + } + + KerberosCredential(Builder builder) { + this.serviceName = builder.serviceName; + this.serviceFqdn = builder.serviceFqdn; + this.keyTab = builder.keyTab; + this.principal = builder.principal; + } + + @Override + public AuthProtocol getProtocol() { + return new KerberosProtocol(serviceName, serviceFqdn, keyTab, principal); + } + + @Override + public String toString() { + return "KerberosCredential{" + + "serviceName='" + + serviceName + + '\'' + + ", serviceFqdn='" + + serviceFqdn + + '\'' + + ", keyTab='" + + keyTab + + '\'' + + ", principal='" + + principal + + '\'' + + '}'; + } + + public static class Builder { + private String serviceName; + private String serviceFqdn; + private String keyTab; + private String principal; + + /** + * kerberos service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME} + * + * @param serviceName + * @return {@code this} + */ + public Builder serviceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + /** + * kerberos service fqdn. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_FQDN} + * + * @param serviceFqdn + * @return {@code this} + */ + public Builder serviceFqdn(String serviceFqdn) { + this.serviceFqdn = serviceFqdn; + return this; + } + + /** + * kerberos keytab file path. Defaults to {@literal ""}, see {@link #DEFAULT_KEYTAB}. + * + * @param keyTab + * @return {@code this} + */ + public Builder keyTab(String keyTab) { + this.keyTab = keyTab; + return this; + } + + /** + * kerberos principal. Defaults to {@literal ""}, see {@link #DEFAULT_PRINCIPAL}. + * + * @param principal + * @return {@code this} + */ + public Builder principal(String principal) { + this.principal = principal; + return this; + } + + /** + * Create a new instance of {@link KerberosCredential}. + * + * @return new instance of {@link KerberosCredential}. + */ + public KerberosCredential build() { + return new KerberosCredential(this); + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java b/src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java new file mode 100644 index 00000000..6ee8779d --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.xiaomi.infra.pegasus.security; + +import com.sun.security.auth.callback.TextCallbackHandler; +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; +import java.util.HashMap; +import java.util.Map; +import javax.security.auth.Subject; +import javax.security.auth.login.AppConfigurationEntry; +import javax.security.auth.login.Configuration; +import javax.security.auth.login.LoginContext; +import javax.security.auth.login.LoginException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class KerberosProtocol implements AuthProtocol { + private static final Logger logger = LoggerFactory.getLogger(KerberosProtocol.class); + + // Subject is a JAAS internal class, Ref: + // https://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/JAASRefGuide.html + // + // To authorize access to resources, applications first need to authenticate the source of the + // request. The JAAS framework defines the term "subject" to represent the source of a request. A + // subject may be any entity, such as a person or a service. + private Subject subject; + // The LoginContext class provides the basic methods used to authenticate subjects, and provides a + // way to develop an application independent of the underlying authentication technology + private LoginContext loginContext; + private String serviceName; + private String serviceFqdn; + + KerberosProtocol(String serviceName, String serviceFqdn, String keyTab, String principal) + throws IllegalArgumentException { + this.serviceName = serviceName; + this.serviceFqdn = serviceFqdn; + try { + // Authenticate the Subject (the source of the request) + // A LoginModule uses a CallbackHandler to communicate with the user to obtain authentication + // information + this.subject = new Subject(); + this.loginContext = + new LoginContext( + "pegasus-client", + subject, + new TextCallbackHandler(), + getLoginContextConfiguration(keyTab, principal)); + this.loginContext.login(); + } catch (LoginException le) { + throw new IllegalArgumentException("login failed: ", le); + } + + logger.info("login succeed, as user {}", subject.getPrincipals().toString()); + } + + @Override + public void authenticate(ReplicaSession session) { + Negotiation negotiation = new Negotiation(session, subject, serviceName, serviceFqdn); + negotiation.start(); + } + + private static Configuration getLoginContextConfiguration(String keyTab, String principal) { + return new Configuration() { + @Override + public AppConfigurationEntry[] getAppConfigurationEntry(String name) { + Map options = new HashMap<>(); + // TGT is obtained from the ticket cache. + options.put("useTicketCache", "true"); + // get the principal's key from the the keytab + options.put("useKeyTab", "true"); + // renew the TGT + options.put("renewTGT", "true"); + // keytab or the principal's key to be stored in the Subject's private credentials. + options.put("storeKey", "true"); + // the file name of the keytab to get principal's secret key. + options.put("keyTab", keyTab); + // the name of the principal that should be used + options.put("principal", principal); + + return new AppConfigurationEntry[] { + new AppConfigurationEntry( + "com.sun.security.auth.module.Krb5LoginModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + options) + }; + } + }; + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java b/src/main/java/com/xiaomi/infra/pegasus/security/Negotiation.java similarity index 76% rename from src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java rename to src/main/java/com/xiaomi/infra/pegasus/security/Negotiation.java index da950fa2..400ad57e 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/Negotiation.java +++ b/src/main/java/com/xiaomi/infra/pegasus/security/Negotiation.java @@ -16,20 +16,22 @@ * specific language governing permissions and limitations * under the License. */ -package com.xiaomi.infra.pegasus.rpc.async; +package com.xiaomi.infra.pegasus.security; +import com.xiaomi.infra.pegasus.apps.negotiation_request; import com.xiaomi.infra.pegasus.apps.negotiation_response; import com.xiaomi.infra.pegasus.apps.negotiation_status; import com.xiaomi.infra.pegasus.base.blob; import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.operator.negotiation_operator; import com.xiaomi.infra.pegasus.rpc.ReplicationException; +import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession; import java.util.HashMap; import javax.security.auth.Subject; import javax.security.sasl.Sasl; import org.slf4j.Logger; -public class Negotiation { +class Negotiation { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Negotiation.class); private negotiation_status status; private ReplicaSession session; @@ -38,8 +40,11 @@ public class Negotiation { private final HashMap props = new HashMap(); private final Subject subject; - public Negotiation( - ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) { + // Because negotiation message is always the first rpc sent to pegasus server, + // which will cost much more time. so we set negotiation timeout to 10s here + private static final int negotiationTimeoutMS = 10000; + + Negotiation(ReplicaSession session, Subject subject, String serviceName, String serviceFqdn) { this.session = session; this.subject = subject; this.serviceName = serviceName; @@ -47,16 +52,18 @@ public Negotiation( this.props.put(Sasl.QOP, "auth"); } - public void start() { + void start() { status = negotiation_status.SASL_LIST_MECHANISMS; send(status, new blob(new byte[0])); } - public void send(negotiation_status status, blob msg) { - // TODO: send negotiation message, using RecvHandler to handle the corresponding response. + void send(negotiation_status status, blob msg) { + negotiation_request request = new negotiation_request(status, msg); + negotiation_operator operator = new negotiation_operator(request); + session.asyncSend(operator, new RecvHandler(operator), negotiationTimeoutMS, false); } - private class RecvHandler implements Runnable { + private static class RecvHandler implements Runnable { negotiation_operator op; RecvHandler(negotiation_operator op) { @@ -93,7 +100,7 @@ private void handleResponse() throws Exception { } } - public negotiation_status get_status() { + negotiation_status get_status() { return status; } } diff --git a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java b/src/test/java/com/xiaomi/infra/pegasus/security/NegotiationTest.java similarity index 93% rename from src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java rename to src/test/java/com/xiaomi/infra/pegasus/security/NegotiationTest.java index 2940a64d..d47755d9 100644 --- a/src/test/java/com/xiaomi/infra/pegasus/rpc/async/NegotiationTest.java +++ b/src/test/java/com/xiaomi/infra/pegasus/security/NegotiationTest.java @@ -16,11 +16,12 @@ * specific language governing permissions and limitations * under the License. */ -package com.xiaomi.infra.pegasus.rpc.async; +package com.xiaomi.infra.pegasus.security; import static com.xiaomi.infra.pegasus.apps.negotiation_status.SASL_LIST_MECHANISMS; import static org.mockito.ArgumentMatchers.any; +import com.xiaomi.infra.pegasus.security.Negotiation; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito;