Skip to content

Commit

Permalink
Merge pull request apache#9 from andrewor14/ui-refactor
Browse files Browse the repository at this point in the history
Merge with master
  • Loading branch information
tdas committed Apr 12, 2014
2 parents f4f4cbe + 642dd88 commit fc73ca5
Show file tree
Hide file tree
Showing 136 changed files with 1,633 additions and 653 deletions.
4 changes: 3 additions & 1 deletion .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ work
.*\.q
golden
test.out/*
.*iml
.*iml
python/metastore/service.properties
python/metastore/db.lck
4 changes: 2 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializ
class TestMessage(val targetId: String) extends Message[String] with Serializable

class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {

var sc: SparkContext = _

after {
if (sc != null) {
sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
*/
public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
}
}
9 changes: 9 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,12 @@ table.sortable thead {
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0);
}

span.kill-link {
margin-right: 2px;
color: gray;
}

span.kill-link a {
color: gray;
}
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils

private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {

var baseDir : File = null
var fileDir : File = null
var jarDir : File = null
var httpServer : HttpServer = null
var serverUri : String = null

def initialize() {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
Expand All @@ -43,24 +43,24 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
}

def stop() {
httpServer.stop()
}

def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri + "/files/" + file.getName
}

def addJar(file: File) : String = {
addFileToDir(file, jarDir)
serverUri + "/jars/" + file.getName
}

def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}

}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
}
}

/**
/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
* isn't passed in plaintext.
*/
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
val constraint = new Constraint()
// use DIGEST-MD5 as the authentication mechanism
// use DIGEST-MD5 as the authentication mechanism
constraint.setName(Constraint.__DIGEST_AUTH)
constraint.setRoles(Array("user"))
constraint.setAuthenticate(true)
constraint.setDataConstraint(Constraint.DC_NONE)

val cm = new ConstraintMapping()
cm.setConstraint(constraint)
cm.setPathSpec("/*")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait Partition extends Serializable {
* Get the split's index within its parent RDD
*/
def index: Int

// A better default implementation of HashCode
override def hashCode(): Int = index
}
88 changes: 44 additions & 44 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,93 +25,93 @@ import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil

/**
* Spark class responsible for security.
*
/**
* Spark class responsible for security.
*
* In general this class should be instantiated by the SparkEnv and most components
* should access it from that. There are some cases where the SparkEnv hasn't been
* should access it from that. There are some cases where the SparkEnv hasn't been
* initialized yet and this class must be instantiated directly.
*
*
* Spark currently supports authentication via a shared secret.
* Authentication can be configured to be on via the 'spark.authenticate' configuration
* parameter. This parameter controls whether the Spark communication protocols do
* parameter. This parameter controls whether the Spark communication protocols do
* authentication using the shared secret. This authentication is a basic handshake to
* make sure both sides have the same shared secret and are allowed to communicate.
* If the shared secret is not identical they will not be allowed to communicate.
*
* The Spark UI can also be secured by using javax servlet filters. A user may want to
* secure the UI if it has data that other users should not be allowed to see. The javax
* servlet filter specified by the user can authenticate the user and then once the user
* is logged in, Spark can compare that user versus the view acls to make sure they are
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* If the shared secret is not identical they will not be allowed to communicate.
*
* The Spark UI can also be secured by using javax servlet filters. A user may want to
* secure the UI if it has data that other users should not be allowed to see. The javax
* servlet filter specified by the user can authenticate the user and then once the user
* is logged in, Spark can compare that user versus the view acls to make sure they are
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* control the behavior of the acls. Note that the person who started the application
* always has view access to the UI.
*
* Spark does not currently support encryption after authentication.
*
*
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
* and ensured to be identical in the connection handshake between the client
* and the server. If they are not identical then the client will be refused
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
* and ensured to be identical in the connection handshake between the client
* and the server. If they are not identical then the client will be refused
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
* Akka also has an option to turn on SSL, this option is not currently supported
* but we could add a configuration option in the future.
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
* services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
* in plaintext.
* We currently do not support SSL (https), but Jetty can be configured to use it
* so we could add a configuration option for this in the future.
*
*
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
* Any clients must specify the user and password. There is a default
* Any clients must specify the user and password. There is a default
* Authenticator installed in the SecurityManager to how it does the authentication
* and in this case gets the user name and password from the request.
*
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* as the authentication mechanism. This means the shared secret is not passed
* over the wire in plaintext.
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
* the connection is not supporting integrity or privacy protection (encryption)
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* SPARK could be support in the future to allow the user to specify the quality
* of protection they want. If we support those, the messages will also have to
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the connectionManager does asynchronous messages passing, the SASL
*
* Since the connectionManager does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* If its acting as a client and trying to send a message to another ConnectionManager,
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
* companies normal login service. If an authentication filter is in place then the
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
* The filters can also be used for many different purposes. For instance filters
* could be used for logging, encryption, or compression.
*
*
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
*
*
* For Yarn deployments, the secret is automatically generated using the Akka remote
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
* around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
Expand All @@ -121,7 +121,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
* and Spark will use that to do authorization against the view acls.
*
*
* For other Spark deployments, the shared secret must be specified via the
* spark.authenticate.secret config.
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
Expand Down Expand Up @@ -152,7 +152,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
" are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString())

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn) {
Authenticator.setDefault(
Expand Down Expand Up @@ -214,12 +214,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
def uiAclsEnabled(): Boolean = uiAclsOn

/**
* Checks the given user against the view acl list to see if they have
* Checks the given user against the view acl list to see if they have
* authorization to view the UI. If the UI acls must are disabled
* via spark.ui.acls.enable, all users have view access.
*
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
* @return true is the user has permission, otherwise false
*/
def checkUIViewPermissions(user: String): Boolean = {
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,16 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelAllJobs()
}

/** Cancel a given job if it's scheduled or running */
private[spark] def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
}

/** Cancel a given stage and all jobs associated with it */
private[spark] def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ package org.apache.spark
class SparkException(message: String, cause: Throwable)
extends Exception(message, cause) {

def this(message: String) = this(message, null)
def this(message: String) = this(message, null)
}
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

private val now = new Date()
private val conf = new SerializableWritable(jobConf)

private var jobID = 0
private var splitID = 0
private var attemptID = 0
Expand All @@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def preSetup() {
setIDs(0, 0, 0)
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
val jCtxt = getJobContext()

val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
}

Expand All @@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val outputName = "part-" + numfmt.format(splitID)
val path = FileOutputFormat.getOutputPath(conf.value)
val fs: FileSystem = {
Expand All @@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}

getOutputCommitter().setupTask(getTaskContext())
getOutputCommitter().setupTask(getTaskContext())
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
}

Expand All @@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
case e: IOException => {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
}
Expand Down Expand Up @@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

private def getJobContext(): JobContext = {
if (jobContext == null) {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
jobContext
Expand Down Expand Up @@ -175,7 +175,7 @@ object SparkHadoopWriter {
val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}

def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
Expand Down
Loading

0 comments on commit fc73ca5

Please sign in to comment.