diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 298a8201960d1..4ed91fb8885cc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.history +import java.io.File + import org.apache.spark.ui.SparkUI private[spark] case class ApplicationAttemptInfo( @@ -62,4 +64,10 @@ private[history] abstract class ApplicationHistoryProvider { */ def getConfig(): Map[String, String] = Map() + /** + * Get the event logs for the given application. The event logs are compressed into a zip file + * and copied into the directory passed in. + */ + def copyApplicationEventLogs(appId: String, directory: File) = { } + } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 45c2be34c8680..837ddb632b54a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -17,8 +17,10 @@ package org.apache.spark.deploy.history -import java.io.{BufferedInputStream, FileNotFoundException, IOException, InputStream} +import java.io.{FileOutputStream, File, BufferedInputStream, FileNotFoundException, + IOException, InputStream} import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.mutable @@ -219,6 +221,47 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + override def copyApplicationEventLogs(appId: String, directory: File): Unit = { + val buffer = new Array[Byte](64 * 1024) + /** + * Copy the data from the path specified into a new [[ZipEntry]] with the remotePath's name. + */ + def copyToZipStream(remotePath: Path, zipStream: ZipOutputStream): Unit = { + val inputStream = fs.open(remotePath, 1 * 1024 * 1024) // 1MB Buffer + zipStream.putNextEntry(new ZipEntry(remotePath.getName)) + var dataRemaining = true + while (dataRemaining) { + val length = inputStream.read(buffer) + if (length != -1) { + zipStream.write(buffer, 0, length) + } else { + dataRemaining = false + } + } + zipStream.closeEntry() + inputStream.close() + } + + applications.get(appId).foreach { appInfo => + val outFile = new File(directory, s"eventLogs-$appId.zip") + val zipStream = new ZipOutputStream(new FileOutputStream(outFile)) + appInfo.attempts.foreach { attempt => + val remotePath = new Path(logDir, attempt.logPath) + if (isLegacyLogDirectory(fs.getFileStatus(remotePath))) { + val filesIter = fs.listFiles(remotePath, true) + while (filesIter.hasNext) { + copyToZipStream(filesIter.next().getPath, zipStream) + } + } else { + copyToZipStream(remotePath, zipStream) + } + } + zipStream.finish() + zipStream.close() + } + } + + /** * Replay the log files in the list and merge the list of old applications with new ones */ diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 517cbe5176241..b302a790f3eae 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.history +import java.io.File import java.util.NoSuchElementException import javax.servlet.http.{HttpServlet, HttpServletRequest, HttpServletResponse} @@ -25,7 +26,7 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder} import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, JsonRootResource, UIRoot} +import org.apache.spark.status.api.v1.{ApplicationInfo, ApplicationsListResource, ApiRootResource, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{SignalLogger, Utils} @@ -125,7 +126,7 @@ class HistoryServer( def initialize() { attachPage(new HistoryPage(this)) - attachHandler(JsonRootResource.getJsonServlet(this)) + attachHandler(ApiRootResource.getApiServlet(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) @@ -172,6 +173,11 @@ class HistoryServer( getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo) } + def copyEventLogsToDirectory(appId: String, destDir: File): Unit = { + provider.copyApplicationEventLogs(appId, destDir) + } + + /** * Returns the provider configuration to show in the listing page. * diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index eb26e9f99c70b..10b250e93eaec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master.ui import org.apache.spark.Logging import org.apache.spark.deploy.master.Master -import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, JsonRootResource, UIRoot} +import org.apache.spark.status.api.v1.{ApplicationsListResource, ApplicationInfo, ApiRootResource, UIRoot} import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.RpcUtils @@ -47,7 +47,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) attachPage(new HistoryNotFoundPage(this)) attachPage(masterPage) attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) - attachHandler(JsonRootResource.getJsonServlet(this)) + attachHandler(ApiRootResource.getApiServlet(this)) attachHandler(createRedirectHandler( "/app/kill", "/", masterPage.handleAppKillRequest, httpMethods = Set("POST"))) attachHandler(createRedirectHandler( diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala similarity index 94% rename from core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala rename to core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index c3ec45f54681b..472becd7ebb08 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JsonRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -39,7 +39,7 @@ import org.apache.spark.ui.SparkUI * HistoryServerSuite. */ @Path("/v1") -private[v1] class JsonRootResource extends UIRootFromServletContext { +private[v1] class ApiRootResource extends UIRootFromServletContext { @Path("applications") def getApplicationList(): ApplicationListResource = { @@ -164,13 +164,22 @@ private[v1] class JsonRootResource extends UIRootFromServletContext { } } + @Path("applications/{appId}/{attemptId}/download") + def getEventLogs( + @PathParam("appId") appId: String, + @PathParam("attemptId") attemptId: String): EventLogDownloadResource = { + uiRoot.withSparkUI(appId, Some(attemptId)) { ui => + new EventLogDownloadResource(ui, appId) + } + } + } -private[spark] object JsonRootResource { +private[spark] object ApiRootResource { - def getJsonServlet(uiRoot: UIRoot): ServletContextHandler = { + def getApiServlet(uiRoot: UIRoot): ServletContextHandler = { val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) - jerseyContext.setContextPath("/json") + jerseyContext.setContextPath("/api") val holder:ServletHolder = new ServletHolder(classOf[ServletContainer]) holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", "com.sun.jersey.api.core.PackagesResourceConfig") diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala new file mode 100644 index 0000000000000..d7ccac0f8666a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/api/v1/EventLogDownloadResource.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.status.api.v1 + +import java.io.{FileInputStream, OutputStream, File, InputStream} +import javax.ws.rs.{GET, Produces} +import javax.ws.rs.core.{MultivaluedMap, MediaType} + +import org.apache.spark.deploy.history.HistoryServer +import org.apache.spark.util.Utils + +@Produces(Array(MediaType.APPLICATION_OCTET_STREAM)) +private[v1] class EventLogDownloadResource(val uIRoot: UIRoot, val appId: String) { + + @GET + def getEventLogs(headers: MultivaluedMap[String, AnyRef], outputStream: OutputStream): Unit = { + uIRoot match { + case hs: HistoryServer => + val dir = Utils.createTempDir() + Utils.chmod700(dir) + hs.copyEventLogsToDirectory(appId, dir) + dir.listFiles().headOption.foreach { zipFile => + headers.add("Content-Length", zipFile.length().toString) + headers.add("Content-Type", MediaType.APPLICATION_OCTET_STREAM) + headers.add("Content-Disposition", s"attachment; filename=${zipFile.getName}") + var inputStream: InputStream = null + try { + inputStream = new FileInputStream(zipFile) + val buffer = new Array[Byte](1024 * 1024) + var remaining = true + while (remaining) { + val read = inputStream.read(buffer) + if (read != -1) { + outputStream.write(buffer, 0, read) + } else { + remaining = false + } + } + outputStream.flush() + } finally { + inputStream.close() + Utils.deleteRecursively(dir) + } + } + case _ => outputStream.write( + s"File download not available for application : $appId".getBytes("utf-8")) + } + } +} + +private[v1] object EventLogDownloadResource { + + def unapply(resource: EventLogDownloadResource): Option[(UIRoot, String)] = { + Some((resource.uIRoot, resource.appId)) + } +} diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala index 202a5191ad57d..16073e306169d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/JacksonMessageWriter.scala @@ -38,7 +38,7 @@ import com.fasterxml.jackson.databind.{ObjectMapper, SerializationFeature} * Note that jersey automatically discovers this class based on its package and its annotations. */ @Provider -@Produces(Array(MediaType.APPLICATION_JSON)) +@Produces(Array(MediaType.APPLICATION_JSON, MediaType.APPLICATION_OCTET_STREAM)) private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ val mapper = new ObjectMapper() { @@ -68,7 +68,9 @@ private[v1] class JacksonMessageWriter extends MessageBodyWriter[Object]{ multivaluedMap: MultivaluedMap[String, AnyRef], outputStream: OutputStream): Unit = { t match { - case ErrorWrapper(err) => outputStream.write(err.getBytes("utf-8")) + case ErrorWrapper(err) => outputStream.write(err.getBytes()) + case downloader @ EventLogDownloadResource(_) => + downloader.getEventLogs(multivaluedMap, outputStream) case _ => mapper.writeValue(outputStream, t) } } diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index bfe4a180e8a6f..c99b92a2f57b2 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui import java.util.Date -import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, JsonRootResource, UIRoot} +import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo, ApiRootResource, UIRoot} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener @@ -64,7 +64,7 @@ private[spark] class SparkUI private ( attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath)) - attachHandler(JsonRootResource.getJsonServlet(this)) + attachHandler(ApiRootResource.getApiServlet(this)) // This should be POST only, but, the YARN AM proxy won't proxy POSTs attachHandler(createRedirectHandler( "/stages/stage/kill", "/stages", stagesTab.handleKillRequest, diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index a0a0afa48833e..691bf100f0085 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -17,11 +17,13 @@ package org.apache.spark.deploy.history -import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter} +import java.io.{FileInputStream, BufferedOutputStream, File, FileOutputStream, OutputStreamWriter} import java.net.URI import java.util.concurrent.TimeUnit +import java.util.zip.ZipInputStream import scala.io.Source +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.json4s.jackson.JsonMethods._ @@ -335,6 +337,86 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers assert(!log2.exists()) } + test("Event log copy") { + + def getFileContent(file: File): Array[Byte] = { + val buff = new Array[Byte](file.length().toInt) + val in = new FileInputStream(file) + try { + in.read(buff) + } finally { + in.close() + } + buff + } + + def unzipToDir(zipFile: File, outputDir: File): Unit = { + val zipStream = new ZipInputStream(new FileInputStream(zipFile)) + try { + val buffer = new Array[Byte](128) + var entry = zipStream.getNextEntry + while (entry != null) { + val unzippedFile = new File(outputDir, entry.getName) + val ostream = new BufferedOutputStream(new FileOutputStream(unzippedFile)) + try { + var dataRemains = true + while (dataRemains) { + val read = zipStream.read(buffer) + if (read > 0) ostream.write(buffer, 0, read) else dataRemains = false + } + } finally { + ostream.close() + } + zipStream.closeEntry() + entry = zipStream.getNextEntry + } + } finally { + zipStream.close() + } + } + + val provider = new FsHistoryProvider(createTestConf()) + val log1 = newLogFile("downloadApp1", Some("attempt1"), inProgress = false) + writeFile(log1, true, None, + SparkListenerApplicationStart("downloadApp1", Some("downloadApp1"), System + .currentTimeMillis() - 400, "test", Some("attempt1")), + SparkListenerApplicationEnd(System.currentTimeMillis() - 200) + ) + val log1Buffer = getFileContent(log1) + val log2 = newLogFile("downloadApp1", Some("attempt2"), inProgress = false) + writeFile(log2, true, None, + SparkListenerApplicationStart("downloadApp1", Some("downloadApp1"), System + .currentTimeMillis() - 100, "test", Some("attempt2")), + SparkListenerApplicationEnd(System.currentTimeMillis()) + ) + val log2Buffer = getFileContent(log2) + provider.checkForLogs() + var inputDir: File = null + var outputDir: File = null + try { + inputDir = Utils.createTempDir() + Utils.chmod700(inputDir) + outputDir = Utils.createTempDir() + Utils.chmod700(outputDir) + provider.copyApplicationEventLogs("downloadApp1", inputDir) + val zipFile = inputDir.listFiles.headOption + zipFile.foreach { file => + unzipToDir(file, outputDir) + } + var filesCompared = 0 + outputDir.listFiles().foreach { outputFile => + val bufferToCompare = if (outputFile.getName == log1.getName) log1Buffer else log2Buffer + val result = getFileContent(outputFile) + result should equal (bufferToCompare) + filesCompared += 1 + } + assert(filesCompared === 2) + } finally { + if (inputDir != null) Utils.deleteRecursively(inputDir) + if (outputDir != null) Utils.deleteRecursively(outputDir) + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: