From 134713d705a8de555bed912ac84f5214d7bddc30 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 24 Sep 2018 15:39:07 -0700 Subject: [PATCH] [LIVY-494] Add thriftserver to Livy server ## What changes were proposed in this pull request? The PR adds a configuration parameter in order to startup also the thriftserver when starting Livy server. Apart from this trivial change, other 3 main things were needed and are present in this PR: - Add the thriftserver JARs to the assembly and the livy-server script; - A small refactor in order to enforce impersonation in the `*Session` classes, instead of in the `*Servlet` ones, so that it is picked up by the thriftserver module too (this change is not strictly needed, but I consider it a better option that duplicating this logic in the thriftserver module too); - Creating a UGI from the configured keytab. This is needed because the thriftserver requires the UGI to be created from a keytab in order to work properly and previously Livy was using a UGI generated from the cached TGT (created by the `kinit` command). ## How was this patch tested? Manual test: starting the server and having it up for more than 9 days. Author: Marco Gaido Closes #107 from mgaido91/LIVY-494. --- assembly/assembly.xml | 7 +++ assembly/pom.xml | 11 ++++ bin/livy-server | 5 ++ server/pom.xml | 6 ++ .../main/scala/org/apache/livy/LivyConf.scala | 1 + .../apache/livy/server/AccessManager.scala | 46 +++++++++++++++ .../org/apache/livy/server/LivyServer.scala | 23 ++++++++ .../apache/livy/server/SessionServlet.scala | 56 ++----------------- .../livy/server/ThriftServerFactory.scala | 40 +++++++++++++ .../livy/server/batch/BatchSession.scala | 9 +-- .../server/batch/BatchSessionServlet.scala | 10 +++- .../interactive/InteractiveSession.scala | 8 ++- .../InteractiveSessionServlet.scala | 5 +- .../livy/server/SessionServletSpec.scala | 8 ++- .../livy/server/batch/BatchSessionSpec.scala | 8 ++- .../interactive/InteractiveSessionSpec.scala | 4 +- .../livy/thriftserver/LivyThriftServer.scala | 23 ++++++-- .../LivyThriftSessionManager.scala | 2 +- .../ThriftServerFactoryImpl.scala | 37 ++++++++++++ 19 files changed, 236 insertions(+), 73 deletions(-) create mode 100644 server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala create mode 100644 thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala diff --git a/assembly/assembly.xml b/assembly/assembly.xml index e1dcdffc3..f63fc0b25 100644 --- a/assembly/assembly.xml +++ b/assembly/assembly.xml @@ -75,5 +75,12 @@ * + + ${project.parent.basedir}/thriftserver/server/target/jars + ${assembly.name}/jars + + * + + diff --git a/assembly/pom.xml b/assembly/pom.xml index 1aeb908af..470f24d60 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -91,6 +91,17 @@ tar.gz + + + thriftserver + + + ${project.groupId} + livy-thriftserver + ${project.version} + + + diff --git a/bin/livy-server b/bin/livy-server index eeb57fb11..8d27d4e61 100755 --- a/bin/livy-server +++ b/bin/livy-server @@ -79,10 +79,15 @@ start_livy_server() { LIBDIR="$LIVY_HOME/jars" if [ ! -d "$LIBDIR" ]; then LIBDIR="$LIVY_HOME/server/target/jars" + THRIFT_LIBDIR="$LIVY_HOME/thriftserver/server/target/jars" fi if [ ! -d "$LIBDIR" ]; then echo "Could not find Livy jars directory." 1>&2 exit 1 + else + if [ -d "$THRIFT_LIBDIR" ]; then + LIBDIR="$THRIFT_LIBDIR/*:$LIBDIR" + fi fi LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR" diff --git a/server/pom.xml b/server/pom.xml index aa14b6d35..0ebc32922 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -98,6 +98,12 @@ org.apache.hadoop hadoop-auth ${hadoop.scope} + + + jline + jline + + diff --git a/server/src/main/scala/org/apache/livy/LivyConf.scala b/server/src/main/scala/org/apache/livy/LivyConf.scala index 73544c890..9e61f83c8 100644 --- a/server/src/main/scala/org/apache/livy/LivyConf.scala +++ b/server/src/main/scala/org/apache/livy/LivyConf.scala @@ -99,6 +99,7 @@ object LivyConf { val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h") val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5) + val THRIFT_SERVER_ENABLED = Entry("livy.server.thrift.enabled", false) val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false) val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m") val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null) diff --git a/server/src/main/scala/org/apache/livy/server/AccessManager.scala b/server/src/main/scala/org/apache/livy/server/AccessManager.scala index 346626a7c..c86801e44 100644 --- a/server/src/main/scala/org/apache/livy/server/AccessManager.scala +++ b/server/src/main/scala/org/apache/livy/server/AccessManager.scala @@ -17,6 +17,8 @@ package org.apache.livy.server +import java.security.AccessControlException + import org.apache.livy.{LivyConf, Logging} private[livy] class AccessManager(conf: LivyConf) extends Logging { @@ -94,4 +96,48 @@ private[livy] class AccessManager(conf: LivyConf) extends Logging { * Check whether access control is enabled or not. */ def isAccessControlOn: Boolean = aclsOn + + /** + * Checks that the requesting user can impersonate the target user. + * If the user does not have permission to impersonate, then throws an `AccessControlException`. + * + * @return The user that should be impersonated. That can be the target user if defined, the + * request's user - which may not be defined - otherwise, or `None` if impersonation is + * disabled. + */ + def checkImpersonation( + target: Option[String], + requestUser: String, + livyConf: LivyConf): Option[String] = { + if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) { + if (!target.forall(hasSuperAccess(_, requestUser))) { + throw new AccessControlException( + s"User '$requestUser' not allowed to impersonate '$target'.") + } + target.orElse(Option(requestUser)) + } else { + None + } + } + + /** + * Check that the requesting user has admin access to resources owned by the given target user. + */ + def hasSuperAccess(target: String, requestUser: String): Boolean = { + requestUser == target || checkSuperUser(requestUser) + } + + /** + * Check that the request's user has modify access to resources owned by the given target user. + */ + def hasModifyAccess(target: String, requestUser: String): Boolean = { + requestUser == target || checkModifyPermissions(requestUser) + } + + /** + * Check that the request's user has view access to resources owned by the given target user. + */ + def hasViewAccess(target: String, requestUser: String): Boolean = { + requestUser == target || checkViewPermissions(requestUser) + } } diff --git a/server/src/main/scala/org/apache/livy/server/LivyServer.scala b/server/src/main/scala/org/apache/livy/server/LivyServer.scala index b0b84a2d3..525f560a4 100644 --- a/server/src/main/scala/org/apache/livy/server/LivyServer.scala +++ b/server/src/main/scala/org/apache/livy/server/LivyServer.scala @@ -56,6 +56,8 @@ class LivyServer extends Logging { private var executor: ScheduledExecutorService = _ private var accessManager: AccessManager = _ + private var ugi: UserGroupInformation = _ + def start(): Unit = { livyConf = new LivyConf().loadFromFile("livy.conf") accessManager = new AccessManager(livyConf) @@ -115,6 +117,16 @@ class LivyServer extends Logging { error("Failed to run kinit, stopping the server.") sys.exit(1) } + // This is and should be the only place where a login() on the UGI is performed. + // If an other login in the codebase is strictly needed, a needLogin check should be added to + // avoid anyway that 2 logins are performed. + // This is needed because the thriftserver requires the UGI to be created from a keytab in + // order to work properly and previously Livy was using a UGI generated from the cached TGT + // (created by the kinit command). + if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) { + UserGroupInformation.loginUserFromKeytab(launch_principal, launch_keytab) + } + ugi = UserGroupInformation.getCurrentUser startKinitThread(launch_keytab, launch_principal) } @@ -266,6 +278,11 @@ class LivyServer extends Logging { } }) + if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) { + ThriftServerFactory.getInstance.start( + livyConf, interactiveSessionManager, sessionStore, accessManager) + } + _serverUrl = Some(s"${server.protocol}://${server.host}:${server.port}") sys.props("livy.server.server-url") = _serverUrl.get } @@ -292,6 +309,12 @@ class LivyServer extends Logging { new Runnable() { override def run(): Unit = { if (runKinit(keytab, principal)) { + // The current UGI should never change. If that happens, it is an error condition and + // relogin the original UGI would not update the current UGI. So the server will fail + // due to no valid credentials. The assert here allows to fast detect this error + // condition and fail immediately with a meaningful error. + assert(ugi.equals(UserGroupInformation.getCurrentUser), "Current UGI has changed.") + ugi.reloginFromTicketCache() // schedule another kinit run with a fixed delay. executor.schedule(this, refreshInterval, TimeUnit.MILLISECONDS) } else { diff --git a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala index 69418a855..928aa05dd 100644 --- a/server/src/main/scala/org/apache/livy/server/SessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/SessionServlet.scala @@ -17,6 +17,7 @@ package org.apache.livy.server +import java.security.AccessControlException import javax.servlet.http.HttpServletRequest import org.scalatra._ @@ -149,6 +150,7 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( error { case e: IllegalArgumentException => BadRequest(e.getMessage) + case e: AccessControlException => Forbidden(e.getMessage) } /** @@ -156,52 +158,6 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( */ protected def remoteUser(req: HttpServletRequest): String = req.getRemoteUser() - /** - * Checks that the request's user can impersonate the target user. - * - * If the user does not have permission to impersonate, then halt execution. - * - * @return The user that should be impersonated. That can be the target user if defined, the - * request's user - which may not be defined - otherwise, or `None` if impersonation is - * disabled. - */ - protected def checkImpersonation( - target: Option[String], - req: HttpServletRequest): Option[String] = { - if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) { - if (!target.map(hasSuperAccess(_, req)).getOrElse(true)) { - halt(Forbidden(s"User '${remoteUser(req)}' not allowed to impersonate '$target'.")) - } - target.orElse(Option(remoteUser(req))) - } else { - None - } - } - - /** - * Check that the request's user has view access to resources owned by the given target user. - */ - protected def hasViewAccess(target: String, req: HttpServletRequest): Boolean = { - val user = remoteUser(req) - user == target || accessManager.checkViewPermissions(user) - } - - /** - * Check that the request's user has modify access to resources owned by the given target user. - */ - protected def hasModifyAccess(target: String, req: HttpServletRequest): Boolean = { - val user = remoteUser(req) - user == target || accessManager.checkModifyPermissions(user) - } - - /** - * Check that the request's user has admin access to resources owned by the given target user. - */ - protected def hasSuperAccess(target: String, req: HttpServletRequest): Boolean = { - val user = remoteUser(req) - user == target || accessManager.checkSuperUser(user) - } - /** * Performs an operation on the session, without checking for ownership. Operations executed * via this method must not modify the session in any way, or return potentially sensitive @@ -214,22 +170,22 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata]( * session. */ protected def withViewAccessSession(fn: (S => Any)): Any = - doWithSession(fn, false, Some(hasViewAccess)) + doWithSession(fn, false, Some(accessManager.hasViewAccess)) /** * Performs an operation on the session, verifying whether the caller has view access of the * session. */ protected def withModifyAccessSession(fn: (S => Any)): Any = - doWithSession(fn, false, Some(hasModifyAccess)) + doWithSession(fn, false, Some(accessManager.hasModifyAccess)) private def doWithSession(fn: (S => Any), allowAll: Boolean, - checkFn: Option[(String, HttpServletRequest) => Boolean]): Any = { + checkFn: Option[(String, String) => Boolean]): Any = { val sessionId = params("id").toInt sessionManager.get(sessionId) match { case Some(session) => - if (allowAll || checkFn.map(_(session.owner, request)).getOrElse(false)) { + if (allowAll || checkFn.map(_(session.owner, remoteUser(request))).getOrElse(false)) { fn(session) } else { Forbidden() diff --git a/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala b/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala new file mode 100644 index 000000000..b6f7d9d86 --- /dev/null +++ b/server/src/main/scala/org/apache/livy/server/ThriftServerFactory.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.server + +import org.apache.livy.LivyConf +import org.apache.livy.server.recovery.SessionStore +import org.apache.livy.sessions.InteractiveSessionManager + +/** + * Its implementation starts Livy ThriftServer + */ +trait ThriftServerFactory { + def start( + livyConf: LivyConf, + livySessionManager: InteractiveSessionManager, + sessionStore: SessionStore, + accessManager: AccessManager): Unit +} + +object ThriftServerFactory { + def getInstance: ThriftServerFactory = { + Class.forName("org.apache.livy.thriftserver.ThriftServerFactoryImpl").newInstance() + .asInstanceOf[ThriftServerFactory] + } +} diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala index 24c6cfd83..3d1e6cd8a 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSession.scala @@ -26,8 +26,8 @@ import scala.util.Random import com.fasterxml.jackson.annotation.JsonIgnoreProperties import org.apache.livy.{LivyConf, Logging, Utils} +import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore -import org.apache.livy.server.SessionServlet import org.apache.livy.sessions.{Session, SessionState} import org.apache.livy.sessions.Session._ import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder} @@ -55,11 +55,12 @@ object BatchSession extends Logging { id: Int, request: CreateBatchRequest, livyConf: LivyConf, + accessManager: AccessManager, owner: String, - proxyUser: Option[String], sessionStore: SessionStore, mockApp: Option[SparkApp] = None): BatchSession = { val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}" + val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, owner, livyConf) def createSparkApp(s: BatchSession): SparkApp = { val conf = SparkApp.prepareSparkConf( @@ -72,7 +73,7 @@ object BatchSession extends Logging { val builder = new SparkProcessBuilder(livyConf) builder.conf(conf) - proxyUser.foreach(builder.proxyUser) + impersonatedUser.foreach(builder.proxyUser) request.className.foreach(builder.className) request.driverMemory.foreach(builder.driverMemory) request.driverCores.foreach(builder.driverCores) @@ -116,7 +117,7 @@ object BatchSession extends Logging { SessionState.Starting, livyConf, owner, - proxyUser, + impersonatedUser, sessionStore, mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp)) } diff --git a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala index 85945d988..a069a50bf 100644 --- a/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/batch/BatchSessionServlet.scala @@ -42,16 +42,20 @@ class BatchSessionServlet( override protected def createSession(req: HttpServletRequest): BatchSession = { val createRequest = bodyAs[CreateBatchRequest](req) - val proxyUser = checkImpersonation(createRequest.proxyUser, req) BatchSession.create( - sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore) + sessionManager.nextId(), + createRequest, + livyConf, + accessManager, + remoteUser(req), + sessionStore) } override protected[batch] def clientSessionView( session: BatchSession, req: HttpServletRequest): Any = { val logs = - if (hasViewAccess(session.owner, req)) { + if (accessManager.hasViewAccess(session.owner, remoteUser(req))) { val lines = session.logLines() val size = 10 diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala index 8bb164182..43a61ac14 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSession.scala @@ -39,6 +39,7 @@ import org.apache.livy._ import org.apache.livy.client.common.HttpMessages._ import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf} import org.apache.livy.rsc.driver.Statement +import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions._ import org.apache.livy.sessions.Session._ @@ -66,13 +67,14 @@ object InteractiveSession extends Logging { def create( id: Int, owner: String, - proxyUser: Option[String], livyConf: LivyConf, + accessManager: AccessManager, request: CreateInteractiveRequest, sessionStore: SessionStore, mockApp: Option[SparkApp] = None, mockClient: Option[RSCClient] = None): InteractiveSession = { val appTag = s"livy-session-$id-${Random.alphanumeric.take(8).mkString}" + val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, owner, livyConf) val client = mockClient.orElse { val conf = SparkApp.prepareSparkConf(appTag, livyConf, prepareConf( @@ -101,7 +103,7 @@ object InteractiveSession extends Logging { .setAll(builderProperties.asJava) .setConf("livy.client.session-id", id.toString) .setConf(RSCConf.Entry.DRIVER_CLASS.key(), "org.apache.livy.repl.ReplDriver") - .setConf(RSCConf.Entry.PROXY_USER.key(), proxyUser.orNull) + .setConf(RSCConf.Entry.PROXY_USER.key(), impersonatedUser.orNull) .setURI(new URI("rsc:/")) Option(builder.build().asInstanceOf[RSCClient]) @@ -117,7 +119,7 @@ object InteractiveSession extends Logging { request.heartbeatTimeoutInSecond, livyConf, owner, - proxyUser, + impersonatedUser, sessionStore, mockApp) } diff --git a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala index 4d614f41b..7450cd7c8 100644 --- a/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala +++ b/server/src/main/scala/org/apache/livy/server/interactive/InteractiveSessionServlet.scala @@ -52,12 +52,11 @@ class InteractiveSessionServlet( override protected def createSession(req: HttpServletRequest): InteractiveSession = { val createRequest = bodyAs[CreateInteractiveRequest](req) - val proxyUser = checkImpersonation(createRequest.proxyUser, req) InteractiveSession.create( sessionManager.nextId(), remoteUser(req), - proxyUser, livyConf, + accessManager, createRequest, sessionStore) } @@ -66,7 +65,7 @@ class InteractiveSessionServlet( session: InteractiveSession, req: HttpServletRequest): Any = { val logs = - if (hasViewAccess(session.owner, req)) { + if (accessManager.hasViewAccess(session.owner, remoteUser(req))) { Option(session.logLines()) .map { lines => val size = 10 diff --git a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala index f07c29619..e0ebd9a0b 100644 --- a/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/SessionServletSpec.scala @@ -62,14 +62,18 @@ object SessionServletSpec { new SessionServlet(sessionManager, conf, accessManager) with RemoteUserOverride { override protected def createSession(req: HttpServletRequest): Session = { val params = bodyAs[Map[String, String]](req) - checkImpersonation(params.get(PROXY_USER), req) + accessManager.checkImpersonation(params.get(PROXY_USER), remoteUser(req), livyConf) new MockSession(sessionManager.nextId(), remoteUser(req), conf) } override protected def clientSessionView( session: Session, req: HttpServletRequest): Any = { - val logs = if (hasViewAccess(session.owner, req)) session.logLines() else Nil + val logs = if (accessManager.hasViewAccess(session.owner, remoteUser(req))) { + session.logLines() + } else { + Nil + } MockSessionView(session.id, session.owner, logs) } } diff --git a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala index 196d328e2..8381c9530 100644 --- a/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/batch/BatchSessionSpec.scala @@ -30,6 +30,7 @@ import org.scalatest.{BeforeAndAfter, FunSpec, ShouldMatchers} import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf, Utils} +import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.SessionState import org.apache.livy.utils.{AppInfo, SparkApp} @@ -68,7 +69,8 @@ class BatchSessionSpec req.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path")) val conf = new LivyConf().set(LivyConf.LOCAL_FS_WHITELIST, sys.props("java.io.tmpdir")) - val batch = BatchSession.create(0, req, conf, null, None, sessionStore) + val accessManager = new AccessManager(conf) + val batch = BatchSession.create(0, req, conf, accessManager, null, sessionStore) Utils.waitUntil({ () => !batch.state.isActive }, Duration(10, TimeUnit.SECONDS)) (batch.state match { @@ -83,7 +85,9 @@ class BatchSessionSpec val conf = new LivyConf() val req = new CreateBatchRequest() val mockApp = mock[SparkApp] - val batch = BatchSession.create(0, req, conf, null, None, sessionStore, Some(mockApp)) + val accessManager = new AccessManager(conf) + val batch = BatchSession.create( + 0, req, conf, accessManager, null, sessionStore, Some(mockApp)) val expectedAppId = "APPID" batch.appIdKnown(expectedAppId) diff --git a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala index 9ee4cd85f..20e1f2dba 100644 --- a/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala +++ b/server/src/test/scala/org/apache/livy/server/interactive/InteractiveSessionSpec.scala @@ -36,6 +36,7 @@ import org.scalatest.mock.MockitoSugar.mock import org.apache.livy.{ExecuteRequest, JobHandle, LivyBaseUnitTestSuite, LivyConf} import org.apache.livy.rsc.{PingJob, RSCClient, RSCConf} import org.apache.livy.rsc.driver.StatementState +import org.apache.livy.server.AccessManager import org.apache.livy.server.recovery.SessionStore import org.apache.livy.sessions.{PySpark, SessionState, Spark} import org.apache.livy.utils.{AppInfo, SparkApp} @@ -51,6 +52,7 @@ class InteractiveSessionSpec extends FunSpec implicit val formats = DefaultFormats private var session: InteractiveSession = null + private val accessManager = new AccessManager(livyConf) private def createSession( sessionStore: SessionStore = mock[SessionStore], @@ -68,7 +70,7 @@ class InteractiveSessionSpec extends FunSpec SparkLauncher.DRIVER_EXTRA_CLASSPATH -> sys.props("java.class.path"), RSCConf.Entry.LIVY_JARS.key() -> "" ) - InteractiveSession.create(0, null, None, livyConf, req, sessionStore, mockApp) + InteractiveSession.create(0, null, livyConf, accessManager, req, sessionStore, mockApp) } private def executeStatement(code: String, codeType: Option[String] = None): JValue = { diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala index d34c1c0e8..c6702176b 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftServer.scala @@ -17,11 +17,13 @@ package org.apache.livy.thriftserver +import java.security.PrivilegedExceptionAction + import scala.collection.JavaConverters._ import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.security.UserGroupInformation import org.apache.hive.service.server.HiveServer2 -import org.scalatra.ScalatraServlet import org.apache.livy.{LivyConf, Logging} import org.apache.livy.server.AccessManager @@ -61,6 +63,7 @@ object LivyThriftServer extends Logging { accessManager: AccessManager): Unit = synchronized { if (thriftServerThread == null) { info("Starting LivyThriftServer") + val ugi = UserGroupInformation.getCurrentUser val runThriftServer = new Runnable { override def run(): Unit = { try { @@ -69,8 +72,15 @@ object LivyThriftServer extends Logging { livySessionManager, sessionStore, accessManager) - thriftServer.init(hiveConf(livyConf)) - thriftServer.start() + if (UserGroupInformation.isSecurityEnabled) { + ugi.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + doStart(livyConf) + } + }) + } else { + doStart(livyConf) + } info("LivyThriftServer started") } catch { case e: Exception => @@ -86,6 +96,11 @@ object LivyThriftServer extends Logging { } } + private def doStart(livyConf: LivyConf): Unit = { + thriftServer.init(hiveConf(livyConf)) + thriftServer.start() + } + private[thriftserver] def getInstance: Option[LivyThriftServer] = { Option(thriftServer) } @@ -106,7 +121,7 @@ class LivyThriftServer( private[thriftserver] val livyConf: LivyConf, private[thriftserver] val livySessionManager: InteractiveSessionManager, private[thriftserver] val sessionStore: SessionStore, - private val accessManager: AccessManager) extends HiveServer2 { + private[thriftserver] val accessManager: AccessManager) extends HiveServer2 { override def init(hiveConf: HiveConf): Unit = { this.cliService = new LivyCLIService(this) super.init(hiveConf) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala index ec987c53c..7c7b265c0 100644 --- a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/LivyThriftSessionManager.scala @@ -249,8 +249,8 @@ class LivyThriftSessionManager(val server: LivyThriftServer, hiveConf: HiveConf) val newSession = InteractiveSession.create( server.livySessionManager.nextId(), username, - None, server.livyConf, + server.accessManager, createInteractiveRequest, server.sessionStore) onLivySessionOpened(newSession) diff --git a/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala new file mode 100644 index 000000000..5cc570755 --- /dev/null +++ b/thriftserver/server/src/main/scala/org/apache/livy/thriftserver/ThriftServerFactoryImpl.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.livy.thriftserver + +import org.apache.livy.LivyConf +import org.apache.livy.server.{AccessManager, ThriftServerFactory} +import org.apache.livy.server.recovery.SessionStore +import org.apache.livy.sessions.InteractiveSessionManager + +class ThriftServerFactoryImpl extends ThriftServerFactory { + override def start( + livyConf: LivyConf, + livySessionManager: InteractiveSessionManager, + sessionStore: SessionStore, + accessManager: AccessManager): Unit = { + if (LivyThriftServer.getInstance.isDefined) { + throw new RuntimeException(s"A ${classOf[LivyThriftServer].getName} has been already " + + s"started, so a new one cannot be started.") + } + LivyThriftServer.start(livyConf, livySessionManager, sessionStore, accessManager) + } +}