diff --git a/configuration/pegasus.properties b/configuration/pegasus.properties index 29dbe772..c413d5d6 100644 --- a/configuration/pegasus.properties +++ b/configuration/pegasus.properties @@ -5,5 +5,8 @@ enable_perf_counter = false perf_counter_tags = cluster=onebox,app=unit_test push_counter_interval_secs = 10 meta_query_timeout = 5000 -service_name = "" -service_fqdn = "" +auth_protocol = +kerberos_service_name = +kerberos_service_fqdn = +kerberos_keytab = +kerberos_principal = diff --git a/scripts/format-all.sh b/scripts/format-all.sh index 38546458..94c9ed2a 100755 --- a/scripts/format-all.sh +++ b/scripts/format-all.sh @@ -33,6 +33,7 @@ SRC_FILES=(src/main/java/com/xiaomi/infra/pegasus/client/*.java src/main/java/com/xiaomi/infra/pegasus/client/request/*.java src/main/java/com/xiaomi/infra/pegasus/base/*.java src/main/java/com/xiaomi/infra/pegasus/example/*.java + src/main/java/com/xiaomi/infra/pegasus/security/*.java src/test/java/com/xiaomi/infra/pegasus/client/*.java src/test/java/com/xiaomi/infra/pegasus/metrics/*.java src/test/java/com/xiaomi/infra/pegasus/rpc/async/*.java diff --git a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java index be9e6b23..2494eaed 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java +++ b/src/main/java/com/xiaomi/infra/pegasus/client/ClientOptions.java @@ -5,6 +5,7 @@ import static com.xiaomi.infra.pegasus.client.PConfigUtil.loadConfiguration; +import com.xiaomi.infra.pegasus.security.Credential; import java.time.Duration; import org.apache.commons.configuration2.Configuration; import org.apache.commons.configuration2.ConfigurationConverter; @@ -30,7 +31,8 @@ * .falconPerfCounterTags("") * .falconPushInterval(Duration.ofSeconds(10)) * .metaQueryTimeout(Duration.ofMillis(5000)) - * .enableAuth(false) + * .authProtocol("") + * .credential(null) * .build(); * } */ @@ -45,9 +47,7 @@ public class ClientOptions { public static final String PEGASUS_PERF_COUNTER_TAGS_KEY = "perf_counter_tags"; public static final String PEGASUS_PUSH_COUNTER_INTERVAL_SECS_KEY = "push_counter_interval_secs"; public static final String PEGASUS_META_QUERY_TIMEOUT_KEY = "meta_query_timeout"; - public static final String PEGASUS_ENABLE_AUTH_KEY = "enable_auth"; - public static final String PEGASUS_SERVICE_NAME_KEY = "service_name"; - public static final String PEGASUS_SERVICE_FQDN_KEY = "service_fqdn"; + public static final String PEGASUS_AUTH_PROTOCOL_KEY = "auth_protocol"; public static final String DEFAULT_META_SERVERS = "127.0.0.1:34601,127.0.0.1:34602,127.0.0.1:34603"; @@ -58,9 +58,7 @@ public class ClientOptions { public static final Duration DEFAULT_FALCON_PUSH_INTERVAL = Duration.ofSeconds(10); public static final boolean DEFAULT_ENABLE_WRITE_LIMIT = true; public static final Duration DEFAULT_META_QUERY_TIMEOUT = Duration.ofMillis(5000); - public static final boolean DEFAULT_ENABLE_AUTH = false; - public static final String DEFAULT_SERVICE_NAME = ""; - public static final String DEFAULT_SERVICE_FQDN = ""; + public static final String DEFAULT_AUTH_PROTOCOL = ""; private final String metaServers; private final Duration operationTimeout; @@ -70,9 +68,8 @@ public class ClientOptions { private final Duration falconPushInterval; private final boolean enableWriteLimit; private final Duration metaQueryTimeout; - private final boolean enableAuth; - private final String serviceName; - private final String serviceFQDN; + private final String authProtocol; + private final Credential credential; protected ClientOptions(Builder builder) { this.metaServers = builder.metaServers; @@ -83,9 +80,8 @@ protected ClientOptions(Builder builder) { this.falconPushInterval = builder.falconPushInterval; this.enableWriteLimit = builder.enableWriteLimit; this.metaQueryTimeout = builder.metaQueryTimeout; - this.enableAuth = builder.enableAuth; - this.serviceName = builder.serviceName; - this.serviceFQDN = builder.serviceFQDN; + this.authProtocol = builder.authProtocol; + this.credential = builder.credential; } protected ClientOptions(ClientOptions original) { @@ -97,9 +93,8 @@ protected ClientOptions(ClientOptions original) { this.falconPushInterval = original.getFalconPushInterval(); this.enableWriteLimit = original.isWriteLimitEnabled(); this.metaQueryTimeout = original.getMetaQueryTimeout(); - this.enableAuth = original.isEnableAuth(); - this.serviceName = original.getServiceName(); - this.serviceFQDN = original.getServiceFQDN(); + this.authProtocol = original.getAuthProtocol(); + this.credential = original.getCredential(); } /** @@ -159,9 +154,8 @@ public static ClientOptions create(String configPath) throws PException { Duration metaQueryTimeout = Duration.ofMillis( config.getLong(PEGASUS_META_QUERY_TIMEOUT_KEY, DEFAULT_META_QUERY_TIMEOUT.toMillis())); - boolean enableAuth = config.getBoolean(PEGASUS_ENABLE_AUTH_KEY, DEFAULT_ENABLE_AUTH); - String serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME); - String serviceFQDN = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN); + String authProtocol = config.getString(PEGASUS_AUTH_PROTOCOL_KEY, DEFAULT_AUTH_PROTOCOL); + Credential credential = Credential.create(authProtocol, config); return ClientOptions.builder() .metaServers(metaList) @@ -171,9 +165,8 @@ public static ClientOptions create(String configPath) throws PException { .falconPerfCounterTags(perfCounterTags) .falconPushInterval(pushIntervalSecs) .metaQueryTimeout(metaQueryTimeout) - .enableAuth(enableAuth) - .serviceName(serviceName) - .serviceFQDN(serviceFQDN) + .authProtocol(authProtocol) + .credential(credential) .build(); } @@ -192,41 +185,40 @@ public boolean equals(Object options) { && this.falconPushInterval.toMillis() == clientOptions.falconPushInterval.toMillis() && this.enableWriteLimit == clientOptions.enableWriteLimit && this.metaQueryTimeout.toMillis() == clientOptions.metaQueryTimeout.toMillis() - && this.enableAuth == clientOptions.enableAuth - && this.serviceName == clientOptions.serviceName - && this.serviceFQDN == clientOptions.serviceFQDN; + && this.authProtocol == clientOptions.authProtocol + && this.credential == clientOptions.credential; } return false; } @Override public String toString() { - return "ClientOptions{" - + "metaServers='" - + metaServers - + '\'' - + ", operationTimeout(ms)=" - + operationTimeout.toMillis() - + ", asyncWorkers=" - + asyncWorkers - + ", enablePerfCounter=" - + enablePerfCounter - + ", falconPerfCounterTags='" - + falconPerfCounterTags - + '\'' - + ", falconPushInterval(s)=" - + falconPushInterval.getSeconds() - + ",enableWriteLimit=" - + enableWriteLimit - + ", metaQueryTimeout(ms)=" - + metaQueryTimeout.toMillis() - + ", enableAuth=" - + enableAuth - + ", serviceName=" - + serviceName - + ", serviceFQDN=" - + serviceFQDN - + '}'; + String res = + "ClientOptions{" + + "metaServers='" + + metaServers + + '\'' + + ", operationTimeout(ms)=" + + operationTimeout.toMillis() + + ", asyncWorkers=" + + asyncWorkers + + ", enablePerfCounter=" + + enablePerfCounter + + ", falconPerfCounterTags='" + + falconPerfCounterTags + + '\'' + + ", falconPushInterval(s)=" + + falconPushInterval.getSeconds() + + ",enableWriteLimit=" + + enableWriteLimit + + ", metaQueryTimeout(ms)=" + + metaQueryTimeout.toMillis() + + ", authProtocol=" + + authProtocol; + if (credential != null) { + res += ", credential=" + credential.toString(); + } + return res + '}'; } /** Builder for {@link ClientOptions}. */ @@ -239,9 +231,8 @@ public static class Builder { private Duration falconPushInterval = DEFAULT_FALCON_PUSH_INTERVAL; private boolean enableWriteLimit = DEFAULT_ENABLE_WRITE_LIMIT; private Duration metaQueryTimeout = DEFAULT_META_QUERY_TIMEOUT; - private boolean enableAuth = DEFAULT_ENABLE_AUTH; - private String serviceName = DEFAULT_SERVICE_NAME; - private String serviceFQDN = DEFAULT_SERVICE_FQDN; + private String authProtocol = DEFAULT_AUTH_PROTOCOL; + private Credential credential = null; protected Builder() {} @@ -345,37 +336,28 @@ public Builder metaQueryTimeout(Duration metaQueryTimeout) { } /** - * Whether to enable authentication. Defaults to {@literal false}, see {@link - * #DEFAULT_ENABLE_AUTH}. + * The authentiation protocol to use. Available protocols are: 1. kerberos; 2."" * - * @param enableAuth - * @return {@code this} - */ - public Builder enableAuth(boolean enableAuth) { - this.enableAuth = enableAuth; - return this; - } - - /** - * service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}. + *
"" means the authentiation is disabled * - * @param serviceName + *
Defaults to {@literal ""}, See {@link #DEFAULT_AUTH_PROTOCOL} + * + * @param authProtocol authentiation protocol. * @return {@code this} */ - public Builder serviceName(String serviceName) { - this.serviceName = serviceName; + public Builder authProtocol(String authProtocol) { + this.authProtocol = authProtocol; return this; } /** - * service full qualified domain name. Defaults to {@literal ""}, see {@link - * #DEFAULT_SERVICE_FQDN}. + * credential info. Defaults to {@literal null} * - * @param serviceFQDN + * @param credential credential * @return {@code this} */ - public Builder serviceFQDN(String serviceFQDN) { - this.serviceFQDN = serviceFQDN; + public Builder credential(Credential credential) { + this.credential = credential; return this; } @@ -407,9 +389,8 @@ public ClientOptions.Builder mutate() { .falconPushInterval(getFalconPushInterval()) .enableWriteLimit(isWriteLimitEnabled()) .metaQueryTimeout(getMetaQueryTimeout()) - .enableAuth(isEnableAuth()) - .serviceName(getServiceName()) - .serviceFQDN(getServiceFQDN()); + .authProtocol(getAuthProtocol()) + .credential(getCredential()); return builder; } @@ -491,29 +472,24 @@ public Duration getMetaQueryTimeout() { } /** - * Whether to enable authentication. Defaults to {@literal false}. + * The authentiation protocol to use. Available protocols are: 1. kerberos; 2."" * - * @return whether to enable authentication. - */ - public boolean isEnableAuth() { - return enableAuth; - } - - /** - * service name. Defaults to {@literal ""}. + *
"" means the authentiation is disabled + * + *
Defaults to {@literal ""}, See {@link #DEFAULT_AUTH_PROTOCOL}
*
- * @return service name.
+ * @return authentiation protocol.
*/
- public String getServiceName() {
- return serviceName;
+ public String getAuthProtocol() {
+ return authProtocol;
}
/**
- * service full qualified domain name. Defaults to {@literal ""}.
+ * credential info. Defaults to {@literal null}
*
- * @return service full qualified domain name.
+ * @return credential
*/
- public String getServiceFQDN() {
- return serviceFQDN;
+ public Credential getCredential() {
+ return credential;
}
}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
index d781bf7c..048f4a82 100644
--- a/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
+++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/interceptor/ReplicaSessionInterceptorManager.java
@@ -20,6 +20,7 @@
import com.xiaomi.infra.pegasus.client.ClientOptions;
import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;
+import com.xiaomi.infra.pegasus.security.AuthReplicaSessionInterceptor;
import java.util.ArrayList;
import java.util.List;
@@ -27,10 +28,9 @@ public class ReplicaSessionInterceptorManager {
private List To create a new instance with Configuration:
+ *
+ * {@code
+ * new KerberosCredential(config);
+ * }
+ *
+ * To customize the settings:
+ *
+ * {@code
+ * KerberosCredential credential =
+ * KerberosCredential.Builder()
+ * .serviceName("")
+ * .serviceFqdn("")
+ * .keytab("")
+ * .principal("")
+ * .build();
+ * }
+ */
+class KerberosCredential implements Credential {
+ public static final String PEGASUS_SERVICE_NAME_KEY = "kerberos_service_name";
+ public static final String PEGASUS_SERVICE_FQDN_KEY = "kerberos_service_fqdn";
+ public static final String PEGASUS_KEYTAB_KEY = "kerberos_keytab";
+ public static final String PEGASUS_PRINCIPAL_KEY = "kerberos_principal";
+
+ public static final String DEFAULT_SERVICE_NAME = "";
+ public static final String DEFAULT_SERVICE_FQDN = "";
+ public static final String DEFAULT_KEYTAB = "";
+ public static final String DEFAULT_PRINCIPAL = "";
+
+ private String serviceName;
+ private String serviceFqdn;
+ private String keyTab;
+ private String principal;
+
+ KerberosCredential(Configuration config) {
+ this.serviceName = config.getString(PEGASUS_SERVICE_NAME_KEY, DEFAULT_SERVICE_NAME);
+ this.serviceFqdn = config.getString(PEGASUS_SERVICE_FQDN_KEY, DEFAULT_SERVICE_FQDN);
+ this.keyTab = config.getString(PEGASUS_KEYTAB_KEY, DEFAULT_KEYTAB);
+ this.principal = config.getString(PEGASUS_PRINCIPAL_KEY, DEFAULT_PRINCIPAL);
+ }
+
+ KerberosCredential(Builder builder) {
+ this.serviceName = builder.serviceName;
+ this.serviceFqdn = builder.serviceFqdn;
+ this.keyTab = builder.keyTab;
+ this.principal = builder.principal;
+ }
+
+ @Override
+ public AuthProtocol getProtocol() {
+ return new KerberosProtocol(serviceName, serviceFqdn, keyTab, principal);
+ }
+
+ @Override
+ public String toString() {
+ return "KerberosCredential{"
+ + "serviceName='"
+ + serviceName
+ + '\''
+ + ", serviceFqdn='"
+ + serviceFqdn
+ + '\''
+ + ", keyTab='"
+ + keyTab
+ + '\''
+ + ", principal='"
+ + principal
+ + '\''
+ + '}';
+ }
+
+ public static class Builder {
+ private String serviceName;
+ private String serviceFqdn;
+ private String keyTab;
+ private String principal;
+
+ /**
+ * kerberos service name. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_NAME}
+ *
+ * @param serviceName
+ * @return {@code this}
+ */
+ public Builder serviceName(String serviceName) {
+ this.serviceName = serviceName;
+ return this;
+ }
+
+ /**
+ * kerberos service fqdn. Defaults to {@literal ""}, see {@link #DEFAULT_SERVICE_FQDN}
+ *
+ * @param serviceFqdn
+ * @return {@code this}
+ */
+ public Builder serviceFqdn(String serviceFqdn) {
+ this.serviceFqdn = serviceFqdn;
+ return this;
+ }
+
+ /**
+ * kerberos keytab file path. Defaults to {@literal ""}, see {@link #DEFAULT_KEYTAB}.
+ *
+ * @param keyTab
+ * @return {@code this}
+ */
+ public Builder keyTab(String keyTab) {
+ this.keyTab = keyTab;
+ return this;
+ }
+
+ /**
+ * kerberos principal. Defaults to {@literal ""}, see {@link #DEFAULT_PRINCIPAL}.
+ *
+ * @param principal
+ * @return {@code this}
+ */
+ public Builder principal(String principal) {
+ this.principal = principal;
+ return this;
+ }
+
+ /**
+ * Create a new instance of {@link KerberosCredential}.
+ *
+ * @return new instance of {@link KerberosCredential}.
+ */
+ public KerberosCredential build() {
+ return new KerberosCredential(this);
+ }
+ }
+}
diff --git a/src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java b/src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java
new file mode 100644
index 00000000..6ee8779d
--- /dev/null
+++ b/src/main/java/com/xiaomi/infra/pegasus/security/KerberosProtocol.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.xiaomi.infra.pegasus.security;
+
+import com.sun.security.auth.callback.TextCallbackHandler;
+import com.xiaomi.infra.pegasus.rpc.async.ReplicaSession;
+import java.util.HashMap;
+import java.util.Map;
+import javax.security.auth.Subject;
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KerberosProtocol implements AuthProtocol {
+ private static final Logger logger = LoggerFactory.getLogger(KerberosProtocol.class);
+
+ // Subject is a JAAS internal class, Ref:
+ // https://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/JAASRefGuide.html
+ //
+ // To authorize access to resources, applications first need to authenticate the source of the
+ // request. The JAAS framework defines the term "subject" to represent the source of a request. A
+ // subject may be any entity, such as a person or a service.
+ private Subject subject;
+ // The LoginContext class provides the basic methods used to authenticate subjects, and provides a
+ // way to develop an application independent of the underlying authentication technology
+ private LoginContext loginContext;
+ private String serviceName;
+ private String serviceFqdn;
+
+ KerberosProtocol(String serviceName, String serviceFqdn, String keyTab, String principal)
+ throws IllegalArgumentException {
+ this.serviceName = serviceName;
+ this.serviceFqdn = serviceFqdn;
+ try {
+ // Authenticate the Subject (the source of the request)
+ // A LoginModule uses a CallbackHandler to communicate with the user to obtain authentication
+ // information
+ this.subject = new Subject();
+ this.loginContext =
+ new LoginContext(
+ "pegasus-client",
+ subject,
+ new TextCallbackHandler(),
+ getLoginContextConfiguration(keyTab, principal));
+ this.loginContext.login();
+ } catch (LoginException le) {
+ throw new IllegalArgumentException("login failed: ", le);
+ }
+
+ logger.info("login succeed, as user {}", subject.getPrincipals().toString());
+ }
+
+ @Override
+ public void authenticate(ReplicaSession session) {
+ Negotiation negotiation = new Negotiation(session, subject, serviceName, serviceFqdn);
+ negotiation.start();
+ }
+
+ private static Configuration getLoginContextConfiguration(String keyTab, String principal) {
+ return new Configuration() {
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ Map