Skip to content

Commit

Permalink
[WIP] Try getting the event log download to work.
Browse files Browse the repository at this point in the history
  • Loading branch information
harishreedharan committed May 19, 2015
1 parent deb4113 commit 3d18ebc
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy.history

import java.io.File

import org.apache.spark.ui.SparkUI

private[spark] case class ApplicationAttemptInfo(
Expand Down Expand Up @@ -62,4 +64,10 @@ private[history] abstract class ApplicationHistoryProvider {
*/
def getConfig(): Map[String, String] = Map()

/**
* Get the event logs for the given application. The event logs are compressed into a zip file
* and copied into the directory passed in.
*/
def copyApplicationEventLogs(appId: String, directory: File) = { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.deploy.history

import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream}
import java.io.{FileOutputStream, File, BufferedInputStream, FileNotFoundException,
IOException, InputStream}
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.mutable

Expand Down Expand Up @@ -219,6 +221,47 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def copyApplicationEventLogs(appId: String, directory: File): Unit = {
val buffer = new Array[Byte](64 * 1024)
/**
* Copy the data from the path specified into a new [[ZipEntry]] with the remotePath's name.
*/
def copyToZipStream(remotePath: Path, zipStream: ZipOutputStream): Unit = {
val inputStream = fs.open(remotePath, 1 * 1024 * 1024) // 1MB Buffer
zipStream.putNextEntry(new ZipEntry(remotePath.getName))
var dataRemaining = true
while (dataRemaining) {
val length = inputStream.read(buffer)
if (length != -1) {
zipStream.write(buffer, 0, length)
} else {
dataRemaining = false
}
}
zipStream.closeEntry()
inputStream.close()
}

applications.get(appId).foreach { appInfo =>
val outFile = new File(directory, s"eventLogs-$appId.zip")
val zipStream = new ZipOutputStream(new FileOutputStream(outFile))
appInfo.attempts.foreach { attempt =>
val remotePath = new Path(logDir, attempt.logPath)
if (isLegacyLogDirectory(fs.getFileStatus(remotePath))) {
val filesIter = fs.listFiles(remotePath, true)
while (filesIter.hasNext) {
copyToZipStream(filesIter.next().getPath, zipStream)
}
} else {
copyToZipStream(remotePath, zipStream)
}
}
zipStream.finish()
zipStream.close()
}
}


/**
* Replay the log files in the list and merge the list of old applications with new ones
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.deploy.history

import java.io.File
import java.util.NoSuchElementException
import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse}

Expand All @@ -25,7 +26,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, ApiRootResource, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{SignalLogger, Utils}
Expand Down Expand Up @@ -125,7 +126,7 @@ class HistoryServer(
def initialize() {
attachPage(new HistoryPage(this))

attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getApiServlet(this))

attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))

Expand Down Expand Up @@ -172,6 +173,11 @@ class HistoryServer(
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
}

def copyEventLogsToDirectory(appId: String, destDir: File): Unit = {
provider.copyApplicationEventLogs(appId, destDir)
}


/**
* Returns the provider configuration to show in the listing page.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master.ui

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, ApiRootResource, UIRoot}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.RpcUtils
Expand Down Expand Up @@ -47,7 +47,7 @@ class MasterWebUI(val master: Master, requestedPort: Int)
attachPage(new HistoryNotFoundPage(this))
attachPage(masterPage)
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getApiServlet(this))
attachHandler(createRedirectHandler(
"/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST")))
attachHandler(createRedirectHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.ui.SparkUI
* HistoryServerSuite.
*/
@Path("/v1")
private[v1] class JsonRootResource extends UIRootFromServletContext {
private[v1] class ApiRootResource extends UIRootFromServletContext {

@Path("applications")
def getApplicationList(): ApplicationListResource = {
Expand Down Expand Up @@ -164,13 +164,22 @@ private[v1] class JsonRootResource extends UIRootFromServletContext {
}
}

@Path("applications/{appId}/{attemptId}/download")
def getEventLogs(
@PathParam("appId") appId: String,
@PathParam("attemptId") attemptId: String): EventLogDownloadResource = {
uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
new EventLogDownloadResource(ui, appId)
}
}

}

private[spark] object JsonRootResource {
private[spark] object ApiRootResource {

def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = {
def getApiServlet(uiRoot: UIRoot): ServletContextHandler = {
val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS)
jerseyContext.setContextPath("/json")
jerseyContext.setContextPath("/api")
val holder:ServletHolder = new ServletHolder(classOf[ServletContainer])
holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
"com.sun.jersey.api.core.PackagesResourceConfig")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api.v1

import java.io.{FileInputStream, OutputStream, File, InputStream}
import javax.ws.rs.{GET, Produces}
import javax.ws.rs.core.{MultivaluedMap, MediaType}

import org.apache.spark.deploy.history.HistoryServer
import org.apache.spark.util.Utils

@Produces(Array(MediaType.APPLICATION_OCTET_STREAM))
private[v1] class EventLogDownloadResource(val uIRoot: UIRoot, val appId: String) {

@GET
def getEventLogs(headers: MultivaluedMap[String, AnyRef], outputStream: OutputStream): Unit = {
uIRoot match {
case hs: HistoryServer =>
val dir = Utils.createTempDir()
Utils.chmod700(dir)
hs.copyEventLogsToDirectory(appId, dir)
dir.listFiles().headOption.foreach { zipFile =>
headers.add("Content-Length", zipFile.length().toString)
headers.add("Content-Type", MediaType.APPLICATION_OCTET_STREAM)
headers.add("Content-Disposition", s"attachment; filename=${zipFile.getName}")
var inputStream: InputStream = null
try {
inputStream = new FileInputStream(zipFile)
val buffer = new Array[Byte](1024 * 1024)
var remaining = true
while (remaining) {
val read = inputStream.read(buffer)
if (read != -1) {
outputStream.write(buffer, 0, read)
} else {
remaining = false
}
}
outputStream.flush()
} finally {
inputStream.close()
Utils.deleteRecursively(dir)
}
}
case _ => outputStream.write(
s"File download not available for application : $appId".getBytes("utf-8"))
}
}
}

private[v1] object EventLogDownloadResource {

def unapply(resource: EventLogDownloadResource): Option[(UIRoot, String)] = {
Some((resource.uIRoot, resource.appId))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature}
* Note that jersey automatically discovers this class based on its package and its annotations.
*/
@Provider
@Produces(Array(MediaType.APPLICATION_JSON))
@Produces(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM))
private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{

val mapper = new ObjectMapper() {
Expand Down Expand Up @@ -68,7 +68,9 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{
multivaluedMap: MultivaluedMap[String, AnyRef],
outputStream: OutputStream): Unit = {
t match {
case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8"))
case ErrorWrapper(err) => outputStream.write(err.getBytes())
case downloader @ EventLogDownloadResource(_) =>
downloader.getEventLogs(multivaluedMap, outputStream)
case _ => mapper.writeValue(outputStream, t)
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui

import java.util.Date

import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot}
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, ApiRootResource, UIRoot}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
Expand Down Expand Up @@ -64,7 +64,7 @@ private[spark] class SparkUI private (
attachTab(new ExecutorsTab(this))
attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
attachHandler(JsonRootResource.getJsonServlet(this))
attachHandler(ApiRootResource.getApiServlet(this))
// This should be POST only, but, the YARN AM proxy won't proxy POSTs
attachHandler(createRedirectHandler(
"/stages/stage/kill", "/stages", stagesTab.handleKillRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.deploy.history

import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.io.{FileInputStream, BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.zip.ZipInputStream

import scala.io.Source
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path
import org.json4s.jackson.JsonMethods._
Expand Down Expand Up @@ -335,6 +337,86 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
assert(!log2.exists())
}

test("Event log copy") {

def getFileContent(file: File): Array[Byte] = {
val buff = new Array[Byte](file.length().toInt)
val in = new FileInputStream(file)
try {
in.read(buff)
} finally {
in.close()
}
buff
}

def unzipToDir(zipFile: File, outputDir: File): Unit = {
val zipStream = new ZipInputStream(new FileInputStream(zipFile))
try {
val buffer = new Array[Byte](128)
var entry = zipStream.getNextEntry
while (entry != null) {
val unzippedFile = new File(outputDir, entry.getName)
val ostream = new BufferedOutputStream(new FileOutputStream(unzippedFile))
try {
var dataRemains = true
while (dataRemains) {
val read = zipStream.read(buffer)
if (read > 0) ostream.write(buffer, 0, read) else dataRemains = false
}
} finally {
ostream.close()
}
zipStream.closeEntry()
entry = zipStream.getNextEntry
}
} finally {
zipStream.close()
}
}

val provider = new FsHistoryProvider(createTestConf())
val log1 = newLogFile("downloadApp1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
SparkListenerApplicationStart("downloadApp1", Some("downloadApp1"), System
.currentTimeMillis() - 400, "test", Some("attempt1")),
SparkListenerApplicationEnd(System.currentTimeMillis() - 200)
)
val log1Buffer = getFileContent(log1)
val log2 = newLogFile("downloadApp1", Some("attempt2"), inProgress = false)
writeFile(log2, true, None,
SparkListenerApplicationStart("downloadApp1", Some("downloadApp1"), System
.currentTimeMillis() - 100, "test", Some("attempt2")),
SparkListenerApplicationEnd(System.currentTimeMillis())
)
val log2Buffer = getFileContent(log2)
provider.checkForLogs()
var inputDir: File = null
var outputDir: File = null
try {
inputDir = Utils.createTempDir()
Utils.chmod700(inputDir)
outputDir = Utils.createTempDir()
Utils.chmod700(outputDir)
provider.copyApplicationEventLogs("downloadApp1", inputDir)
val zipFile = inputDir.listFiles.headOption
zipFile.foreach { file =>
unzipToDir(file, outputDir)
}
var filesCompared = 0
outputDir.listFiles().foreach { outputFile =>
val bufferToCompare = if (outputFile.getName == log1.getName) log1Buffer else log2Buffer
val result = getFileContent(outputFile)
result should equal (bufferToCompare)
filesCompared += 1
}
assert(filesCompared === 2)
} finally {
if (inputDir != null) Utils.deleteRecursively(inputDir)
if (outputDir != null) Utils.deleteRecursively(outputDir)
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down

0 comments on commit 3d18ebc

Please sign in to comment.