Skip to content

Commit

Permalink
Merge pull request #7 from apache/master
Browse files Browse the repository at this point in the history
merge lastest spark
  • Loading branch information
pzzs committed Mar 26, 2015
2 parents d00303b + e87bf37 commit 802261c
Show file tree
Hide file tree
Showing 149 changed files with 2,760 additions and 1,367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage

case class UnregisterApplication(appId: String)

case class MasterChangeAcknowledged(appId: String)

// Master to AppClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private[spark] class AppClient(

case StopAppClient =>
markDead("Application has been stopped.")
master ! UnregisterApplication(appId)
sender ! true
context.stop(self)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[deploy] class ApplicationInfo(
}
}

private[master] val requestedCores = desc.maxCores.getOrElse(defaultCores)
private val requestedCores = desc.maxCores.getOrElse(defaultCores)

private[master] def coresLeft: Int = requestedCores - coresGranted

Expand All @@ -111,6 +111,10 @@ private[deploy] class ApplicationInfo(
endTime = System.currentTimeMillis()
}

private[master] def isFinished: Boolean = {
state != ApplicationState.WAITING && state != ApplicationState.RUNNING
}

def duration: Long = {
if (endTime != -1) {
endTime - startTime
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,11 @@ private[master] class Master(
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and app
logInfo(s"Removing executor ${exec.fullId} because it is $state")
appInfo.removeExecutor(exec)
// If an application has already finished, preserve its
// state to display its information properly on the UI
if (!appInfo.isFinished) {
appInfo.removeExecutor(exec)
}
exec.worker.removeExecutor(exec)

val normalExit = exitStatus == Some(0)
Expand Down Expand Up @@ -428,6 +432,10 @@ private[master] class Master(
if (canCompleteRecovery) { completeRecovery() }
}

case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)

case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,12 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)

val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
"User", "State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)

val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
"Submitted Time", "User", "State", "Duration")
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
completedApps)
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)

val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
"Memory", "Main Class")
Expand Down Expand Up @@ -191,7 +187,7 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(app.state == ApplicationState.RUNNING || app.state == ApplicationState.WAITING)) {
val killLinkUri = s"app/kill?id=${app.id}&terminate=true"
Expand All @@ -201,7 +197,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
(<a href={killLinkUri} onclick={confirm}>kill</a>)
</span>
}

<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -210,15 +205,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
</td>
{
if (active) {
<td>
{app.coresGranted}
</td>
}
}
<td>
{if (app.requestedCores == Int.MaxValue) "*" else app.requestedCores}
{app.coresGranted}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
Expand All @@ -230,14 +218,6 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = true)
}

private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = false)
}

private def driverRow(driver: DriverInfo): Seq[Node] = {
val killLink = if (parent.killEnabled &&
(driver.state == DriverState.RUNNING ||
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class TaskInfo(

def status: String = {
if (running) {
"RUNNING"
} else if (gettingResult) {
"GET RESULT"
if (gettingResult) {
"GET RESULT"
} else {
"RUNNING"
}
} else if (failed) {
"FAILED"
} else if (successful) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ class FileShuffleBlockManager(conf: SparkConf)
private val shuffleState = shuffleStates(shuffleId)
private var fileGroup: ShuffleFileGroup = null

val openStartTime = System.nanoTime
val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
Expand All @@ -135,6 +136,9 @@ class FileShuffleBlockManager(conf: SparkConf)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)

override def releaseWriters(success: Boolean) {
if (consolidateShuffleFiles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ private[spark] class SortShuffleWriter[K, V, C](
sorter.insertAll(records)
}

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = shuffleBlockManager.consolidateId(dep.shuffleId, mapId)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
Expand Down
23 changes: 18 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,14 @@ private[spark] class BlockManager(
/* We'll store the bytes in memory if the block's storage level includes
* "memory serialized", or if it should be cached as objects in memory
* but we only requested its serialized bytes. */
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
memoryStore.putBytes(blockId, bytes.limit, () => {
// https://issues.apache.org/jira/browse/SPARK-6076
// If the file size is bigger than the free memory, OOM will happen. So if we cannot
// put it into MemoryStore, copyForMemory should not be created. That's why this
// action is put into a `() => ByteBuffer` and created lazily.
val copyForMemory = ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
})
bytes.rewind()
}
if (!asBlockResult) {
Expand Down Expand Up @@ -991,15 +996,23 @@ private[spark] class BlockManager(
putIterator(blockId, Iterator(value), level, tellMaster)
}

def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
dropFromMemory(blockId, () => data)
}

/**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
* store reaches its limit and needs to free up space.
*
* If `data` is not put on disk, it won't be created.
*
* Return the block status if the given block has been updated, else None.
*/
def dropFromMemory(
blockId: BlockId,
data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {
data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = {

logInfo(s"Dropping block $blockId from memory")
val info = blockInfo.get(blockId).orNull
Expand All @@ -1023,7 +1036,7 @@ private[spark] class BlockManager(
// Drop to disk, if storage level requires
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo(s"Writing block $blockId to disk")
data match {
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
case Right(bytes) =>
Expand Down
43 changes: 37 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,26 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

/**
* Use `size` to test if there is enough space in MemoryStore. If so, create the ByteBuffer and
* put it into MemoryStore. Otherwise, the ByteBuffer won't be created.
*
* The caller should guarantee that `size` is correct.
*/
def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]
val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)
val data =
if (putAttempt.success) {
assert(bytes.limit == size)
Right(bytes.duplicate())
} else {
null
}
PutResult(size, data, putAttempt.droppedBlocks)
}

override def putArray(
blockId: BlockId,
values: Array[Any],
Expand Down Expand Up @@ -312,11 +332,22 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
blockId.asRDDId.map(_.rddId)
}

private def tryToPut(
blockId: BlockId,
value: Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {
tryToPut(blockId, () => value, size, deserialized)
}

/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an Array if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated) size
* must also be passed by the caller.
*
* `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
* created to avoid OOM since it may be a big ByteBuffer.
*
* Synchronize on `accountingLock` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
Expand All @@ -326,7 +357,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
*/
private def tryToPut(
blockId: BlockId,
value: Any,
value: () => Any,
size: Long,
deserialized: Boolean): ResultWithDroppedBlocks = {

Expand All @@ -345,7 +376,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
droppedBlocks ++= freeSpaceResult.droppedBlocks

if (enoughFreeSpace) {
val entry = new MemoryEntry(value, size, deserialized)
val entry = new MemoryEntry(value(), size, deserialized)
entries.synchronized {
entries.put(blockId, entry)
currentMemory += size
Expand All @@ -357,12 +388,12 @@ private[spark] class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[Array[Any]])
lazy val data = if (deserialized) {
Left(value().asInstanceOf[Array[Any]])
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
Right(value().asInstanceOf[ByteBuffer].duplicate())
}
val droppedBlockStatus = blockManager.dropFromMemory(blockId, data)
val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)
droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }
}
// Release the unroll memory used because we no longer need the underlying Array
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui

import java.util.concurrent.Semaphore

import scala.util.Random

import org.apache.spark.{SparkConf, SparkContext}
Expand Down Expand Up @@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
)

val barrier = new Semaphore(-nJobSet * jobs.size + 1)

(1 to nJobSet).foreach { _ =>
for ((desc, job) <- jobs) {
new Thread {
Expand All @@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
} catch {
case e: Exception =>
println("Job Failed: " + desc)
} finally {
barrier.release()
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}

// Waiting for threads.
barrier.acquire()
sc.stop()
}
}
Loading

0 comments on commit 802261c

Please sign in to comment.