Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Oct 13, 2014
2 parents 4cb1b93 + 4a2e36d commit ab028d1
Show file tree
Hide file tree
Showing 101 changed files with 3,499 additions and 654 deletions.
51 changes: 38 additions & 13 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,47 @@ fi

. "$FWDIR"/bin/load-spark-env.sh

# Figure out which Python executable to use
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
# executable, while the worker would still be launched using PYSPARK_PYTHON.
#
# In Spark 1.2, we removed the documentation of the IPYTHON and IPYTHON_OPTS variables and added
# PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS to allow IPython to be used for the driver.
# Now, users can simply set PYSPARK_DRIVER_PYTHON=ipython to use IPython and set
# PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
# and executor Python executables.
#
# For backwards-compatibility, we retain the old IPYTHON and IPYTHON_OPTS variables.

# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set:
if hash python2.7 2>/dev/null; then
# Attempt to use Python 2.7, if installed:
DEFAULT_PYTHON="python2.7"
else
DEFAULT_PYTHON="python"
fi

# Determine the Python executable to use for the driver:
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
# If IPython options are specified, assume user wants to run IPython
# (for backwards-compatibility)
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
PYSPARK_DRIVER_PYTHON="ipython"
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi

# Determine the Python executable to use for the executors:
if [[ -z "$PYSPARK_PYTHON" ]]; then
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON="ipython"
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
exit 1
else
PYSPARK_PYTHON="python"
PYSPARK_PYTHON="$DEFAULT_PYTHON"
fi
fi
export PYSPARK_PYTHON

if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
Expand Down Expand Up @@ -93,9 +118,9 @@ if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
exec "$PYSPARK_PYTHON" $1
exec "$PYSPARK_DRIVER_PYTHON" $1
fi
exit
fi
Expand All @@ -111,5 +136,5 @@ if [[ "$1" =~ \.py$ ]]; then
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
fi
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.Arrays
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
Expand Down Expand Up @@ -1429,7 +1430,10 @@ object SparkContext extends Logging {
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}

implicit def stringWritableConverter(): WritableConverter[String] =
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import com.google.common.io.Files

import org.apache.spark.util.Utils

/**
* Utilities for tests. Included in main codebase since it's used by multiple
* projects.
Expand All @@ -42,8 +44,7 @@ private[spark] object TestUtils {
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
val tempDir = Files.createTempDir()
tempDir.deleteOnExit()
val tempDir = Utils.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Try, Success, Failure}

import net.razorvine.pickle.{Pickler, Unpickler}

Expand All @@ -42,7 +40,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

private[spark] class PythonRDD(
parent: RDD[_],
@transient parent: RDD[_],
command: Array[Byte],
envVars: JMap[String, String],
pythonIncludes: JList[String],
Expand All @@ -55,9 +53,9 @@ private[spark] class PythonRDD(
val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)

override def getPartitions = parent.partitions
override def getPartitions = firstParent.partitions

override val partitioner = if (preservePartitoning) parent.partitioner else None
override val partitioner = if (preservePartitoning) firstParent.partitioner else None

override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
Expand Down Expand Up @@ -234,7 +232,7 @@ private[spark] class PythonRDD(
dataOut.writeInt(command.length)
dataOut.write(command)
// Data values
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
PythonRDD.writeIteratorToStream(firstParent.iterator(split, context), dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.flush()
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
val worker = pb.start()

// Redirect worker stdout and stderr
Expand Down Expand Up @@ -149,10 +151,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
daemon = pb.start()

val in = new DataInputStream(daemon.getInputStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ object PythonRunner {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
Expand All @@ -57,6 +58,7 @@ object PythonRunner {
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,27 @@ sealed abstract class ManagedBuffer {
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

override def size: Long = length

override def nioByteBuffer(): ByteBuffer = {
var channel: FileChannel = null
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
val buf = ByteBuffer.allocate(length.toInt)
channel.read(buf, offset)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, offset, length)
}
} catch {
case e: IOException =>
Try(channel.size).toOption match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,9 +748,7 @@ private[nio] class ConnectionManager(
} catch {
case e: Exception => {
logError(s"Exception was thrown while processing message", e)
val m = Message.createBufferMessage(bufferMessage.id)
m.hasError = true
ackMessage = Some(m)
ackMessage = Some(Message.createErrorMessage(e, bufferMessage.id))
}
} finally {
sendMessage(connectionManagerId, ackMessage.getOrElse {
Expand Down Expand Up @@ -913,8 +911,12 @@ private[nio] class ConnectionManager(
}
case scala.util.Success(ackMessage) =>
if (ackMessage.hasError) {
val errorMsgByteBuf = ackMessage.asInstanceOf[BufferMessage].buffers.head
val errorMsgBytes = new Array[Byte](errorMsgByteBuf.limit())
errorMsgByteBuf.get(errorMsgBytes)
val errorMsg = new String(errorMsgBytes, "utf-8")
val e = new IOException(
"sendMessageReliably failed with ACK that signalled a remote error")
s"sendMessageReliably failed with ACK that signalled a remote error: $errorMsg")
if (!promise.tryFailure(e)) {
logWarning("Ignore error because promise is completed", e)
}
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/network/nio/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.util.Utils

private[nio] abstract class Message(val typ: Long, val id: Int) {
var senderAddress: InetSocketAddress = null
Expand Down Expand Up @@ -84,6 +85,19 @@ private[nio] object Message {
createBufferMessage(new Array[ByteBuffer](0), ackId)
}

/**
* Create a "negative acknowledgment" to notify a sender that an error occurred
* while processing its message. The exception's stacktrace will be formatted
* as a string, serialized into a byte array, and sent as the message payload.
*/
def createErrorMessage(exception: Exception, ackId: Int): BufferMessage = {
val exceptionString = Utils.exceptionString(exception)
val serializedExceptionString = ByteBuffer.wrap(exceptionString.getBytes("utf-8"))
val errorMessage = createBufferMessage(serializedExceptionString, ackId)
errorMessage.hasError = true
errorMessage
}

def create(header: MessageChunkHeader): Message = {
val newMessage: Message = header.typ match {
case BUFFER_MESSAGE => new BufferMessage(header.id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,14 @@ final class NioBlockTransferService(conf: SparkConf, securityManager: SecurityMa
} catch {
case e: Exception => {
logError("Exception handling buffer message", e)
val errorMessage = Message.createBufferMessage(msg.id)
errorMessage.hasError = true
Some(errorMessage)
Some(Message.createErrorMessage(e, msg.id))
}
}

case otherMessage: Any =>
logError("Unknown type message received: " + otherMessage)
val errorMessage = Message.createBufferMessage(msg.id)
errorMessage.hasError = true
Some(errorMessage)
val errorMsg = s"Received unknown message type: ${otherMessage.getClass.getName}"
logError(errorMsg)
Some(Message.createErrorMessage(new UnsupportedOperationException(errorMsg), msg.id))
}
}

Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,19 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobPr
<tr>
<td>{k}</td>
<td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
<td sorttable_customekey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td sorttable_customkey={v.taskTime.toString}>{UIUtils.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>
<td sorttable_customekey={v.inputBytes.toString}>
<td sorttable_customkey={v.inputBytes.toString}>
{Utils.bytesToString(v.inputBytes)}</td>
<td sorttable_customekey={v.shuffleRead.toString}>
<td sorttable_customkey={v.shuffleRead.toString}>
{Utils.bytesToString(v.shuffleRead)}</td>
<td sorttable_customekey={v.shuffleWrite.toString}>
<td sorttable_customkey={v.shuffleWrite.toString}>
{Utils.bytesToString(v.shuffleWrite)}</td>
<td sorttable_customekey={v.memoryBytesSpilled.toString}>
<td sorttable_customkey={v.memoryBytesSpilled.toString}>
{Utils.bytesToString(v.memoryBytesSpilled)}</td>
<td sorttable_customekey={v.diskBytesSpilled.toString}>
<td sorttable_customkey={v.diskBytesSpilled.toString}>
{Utils.bytesToString(v.diskBytesSpilled)}</td>
</tr>
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ private[ui] class StageTableBase(
{makeProgressBar(stageData.numActiveTasks, stageData.completedIndices.size,
stageData.numFailedTasks, s.numTasks)}
</td>
<td sorttable_customekey={inputRead.toString}>{inputReadWithUnit}</td>
<td sorttable_customekey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
<td sorttable_customekey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
<td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
<td sorttable_customkey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
<td sorttable_customkey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
}

/** Render an HTML row that represents a stage */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") {
</td>
<td>{rdd.numCachedPartitions}</td>
<td>{"%.0f%%".format(rdd.numCachedPartitions * 100.0 / rdd.numPartitions)}</td>
<td sorttable_customekey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
<td sorttable_customekey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td>
<td sorttable_customekey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
<td sorttable_customkey={rdd.memSize.toString}>{Utils.bytesToString(rdd.memSize)}</td>
<td sorttable_customkey={rdd.tachyonSize.toString}>{Utils.bytesToString(rdd.tachyonSize)}</td>
<td sorttable_customkey={rdd.diskSize.toString} >{Utils.bytesToString(rdd.diskSize)}</td>
</tr>
// scalastyle:on
}
Expand Down
19 changes: 17 additions & 2 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,27 @@ private[spark] class FileLogger(
def this(
logDir: String,
sparkConf: SparkConf,
compress: Boolean = false,
overwrite: Boolean = true) = {
compress: Boolean,
overwrite: Boolean) = {
this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
overwrite = overwrite)
}

def this(
logDir: String,
sparkConf: SparkConf,
compress: Boolean) = {
this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = compress,
overwrite = true)
}

def this(
logDir: String,
sparkConf: SparkConf) = {
this(logDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf), compress = false,
overwrite = true)
}

private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
Expand Down
Loading

0 comments on commit ab028d1

Please sign in to comment.