From a0d564a102eb930f3c061d7827abbcea50ccbb68 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 14 Sep 2015 14:10:54 -0700 Subject: [PATCH 1/4] [SPARK-10522] [SQL] Nanoseconds of Timestamp in Parquet should be positive Or Hive can't read it back correctly. Thanks vanzin for report this. Author: Davies Liu Closes #8674 from davies/positive_nano. (cherry picked from commit 7e32387ae6303fd1cd32389d47df87170b841c67) Signed-off-by: Davies Liu --- .../spark/sql/catalyst/util/DateTimeUtils.scala | 12 +++++++----- .../sql/catalyst/util/DateTimeUtilsSuite.scala | 17 ++++++++--------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index d652fce3fd9b6..687ca000d12bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -42,6 +42,7 @@ object DateTimeUtils { final val SECONDS_PER_DAY = 60 * 60 * 24L final val MICROS_PER_SECOND = 1000L * 1000L final val NANOS_PER_SECOND = MICROS_PER_SECOND * 1000L + final val MICROS_PER_DAY = MICROS_PER_SECOND * SECONDS_PER_DAY final val MILLIS_PER_DAY = SECONDS_PER_DAY * 1000L @@ -190,13 +191,14 @@ object DateTimeUtils { /** * Returns Julian day and nanoseconds in a day from the number of microseconds + * + * Note: support timestamp since 4717 BC (without negative nanoseconds, compatible with Hive). */ def toJulianDay(us: SQLTimestamp): (Int, Long) = { - val seconds = us / MICROS_PER_SECOND - val day = seconds / SECONDS_PER_DAY + JULIAN_DAY_OF_EPOCH - val secondsInDay = seconds % SECONDS_PER_DAY - val nanos = (us % MICROS_PER_SECOND) * 1000L - (day.toInt, secondsInDay * NANOS_PER_SECOND + nanos) + val julian_us = us + JULIAN_DAY_OF_EPOCH * MICROS_PER_DAY + val day = julian_us / MICROS_PER_DAY + val micros = julian_us % MICROS_PER_DAY + (day.toInt, micros * 1000L) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 1596bb79fa94b..6b9a11f0ff743 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -52,15 +52,14 @@ class DateTimeUtilsSuite extends SparkFunSuite { assert(ns === 0) assert(fromJulianDay(d, ns) == 0L) - val t = Timestamp.valueOf("2015-06-11 10:10:10.100") - val (d1, ns1) = toJulianDay(fromJavaTimestamp(t)) - val t1 = toJavaTimestamp(fromJulianDay(d1, ns1)) - assert(t.equals(t1)) - - val t2 = Timestamp.valueOf("2015-06-11 20:10:10.100") - val (d2, ns2) = toJulianDay(fromJavaTimestamp(t2)) - val t22 = toJavaTimestamp(fromJulianDay(d2, ns2)) - assert(t2.equals(t22)) + Seq(Timestamp.valueOf("2015-06-11 10:10:10.100"), + Timestamp.valueOf("2015-06-11 20:10:10.100"), + Timestamp.valueOf("1900-06-11 20:10:10.100")).foreach { t => + val (d, ns) = toJulianDay(fromJavaTimestamp(t)) + assert(ns > 0) + val t1 = toJavaTimestamp(fromJulianDay(d, ns)) + assert(t.equals(t1)) + } } test("SPARK-6785: java date conversion before and after epoch") { From 0e1c9d9ff7f9c8f9ae40179c19abbd1d211d142e Mon Sep 17 00:00:00 2001 From: Tom Graves Date: Mon, 14 Sep 2015 15:05:19 -0700 Subject: [PATCH 2/4] [SPARK-10549] scala 2.11 spark on yarn with security - Repl doesn't work Make this lazy so that it can set the yarn mode before creating the securityManager. Author: Tom Graves Author: Thomas Graves Closes #8719 from tgravescs/SPARK-10549. --- .../scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index be31eb2eda546..627148df80c11 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -35,7 +35,8 @@ object Main extends Logging { s.processArguments(List("-Yrepl-class-based", "-Yrepl-outdir", s"${outputDir.getAbsolutePath}", "-classpath", getAddedJars.mkString(File.pathSeparator)), true) - val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf)) + // the creation of SecurityManager has to be lazy so SPARK_YARN_MODE is set if needed + lazy val classServer = new HttpServer(conf, outputDir, new SecurityManager(conf)) var sparkContext: SparkContext = _ var sqlContext: SQLContext = _ var interp = new SparkILoop // this is a public var because tests reset it. From eb0cb25bb81a5aa271d2a0266e5a31b36d1fc071 Mon Sep 17 00:00:00 2001 From: Forest Fang Date: Mon, 14 Sep 2015 15:07:13 -0700 Subject: [PATCH 3/4] [SPARK-10543] [CORE] Peak Execution Memory Quantile should be Per-task Basis Read `PEAK_EXECUTION_MEMORY` using `update` to get per task partial value instead of cumulative value. I tested with this workload: ```scala val size = 1000 val repetitions = 10 val data = sc.parallelize(1 to size, 5).map(x => (util.Random.nextInt(size / repetitions),util.Random.nextDouble)).toDF("key", "value") val res = data.toDF.groupBy("key").agg(sum("value")).count ``` Before: ![image](https://cloud.githubusercontent.com/assets/4317392/9828197/07dd6874-58b8-11e5-9bd9-6ba927c38b26.png) After: ![image](https://cloud.githubusercontent.com/assets/4317392/9828151/a5ddff30-58b7-11e5-8d31-eda5dc4eae79.png) Tasks view: ![image](https://cloud.githubusercontent.com/assets/4317392/9828199/17dc2b84-58b8-11e5-92a8-be89ce4d29d1.png) cc andrewor14 I appreciate if you can give feedback on this since I think you introduced display of this metric. Author: Forest Fang Closes #8726 from saurfang/stagepage. (cherry picked from commit fd1e8cddf2635c55fec2ac6e1f1c221c9685af0f) Signed-off-by: Andrew Or --- .../org/apache/spark/ui/jobs/StagePage.scala | 2 +- .../org/apache/spark/ui/StagePageSuite.scala | 29 ++++++++++++++----- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 4adc6596ba21c..2b71f55b7bb4f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -368,7 +368,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val peakExecutionMemory = validTasks.map { case TaskUIData(info, _, _) => info.accumulables .find { acc => acc.name == InternalAccumulator.PEAK_EXECUTION_MEMORY } - .map { acc => acc.value.toLong } + .map { acc => acc.update.getOrElse("0").toLong } .getOrElse(0L) .toDouble } diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 3388c6dca81f1..86699e7f56953 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -23,7 +23,7 @@ import scala.xml.Node import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} -import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite, Success} +import org.apache.spark._ import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler._ import org.apache.spark.ui.jobs.{JobProgressListener, StagePage, StagesTab} @@ -47,6 +47,14 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { assert(html3.contains(targetString)) } + test("SPARK-10543: peak execution memory should be per-task rather than cumulative") { + val unsafeConf = "spark.sql.unsafe.enabled" + val conf = new SparkConf(false).set(unsafeConf, "true") + val html = renderStagePage(conf).toString().toLowerCase + // verify min/25/50/75/max show task value not cumulative values + assert(html.contains("10.0 b" * 5)) + } + /** * Render a stage page started with the given conf and return the HTML. * This also runs a dummy stage to populate the page with useful content. @@ -67,12 +75,19 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { // Simulate a stage in job progress listener val stageInfo = new StageInfo(0, 0, "dummy", 1, Seq.empty, Seq.empty, "details") - val taskInfo = new TaskInfo(0, 0, 0, 0, "0", "localhost", TaskLocality.ANY, false) - jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) - jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) - taskInfo.markSuccessful() - jobListener.onTaskEnd( - SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty)) + // Simulate two tasks to test PEAK_EXECUTION_MEMORY correctness + (1 to 2).foreach { + taskId => + val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) + val peakExecutionMemory = 10 + taskInfo.accumulables += new AccumulableInfo(0, InternalAccumulator.PEAK_EXECUTION_MEMORY, + Some(peakExecutionMemory.toString), (peakExecutionMemory * taskId).toString, true) + jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) + jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) + taskInfo.markSuccessful() + jobListener.onTaskEnd( + SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, TaskMetrics.empty)) + } jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo)) page.render(request) } From 5db51f91131e867fd27cb6b0457a2698925cd920 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 14 Sep 2015 15:09:43 -0700 Subject: [PATCH 4/4] [SPARK-10564] ThreadingSuite: assertion failures in threads don't fail the test (round 2) This is a follow-up patch to #8723. I missed one case there. Author: Andrew Or Closes #8727 from andrewor14/fix-threading-suite. (cherry picked from commit 7b6c856367b9c36348e80e83959150da9656c4dd) Signed-off-by: Andrew Or --- .../org/apache/spark/ThreadingSuite.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala index cda2b245526f7..a96a4ce201c21 100644 --- a/core/src/test/scala/org/apache/spark/ThreadingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ThreadingSuite.scala @@ -147,12 +147,12 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { }.start() } sem.acquire(2) + throwable.foreach { t => throw t } if (ThreadingSuiteState.failed.get()) { logError("Waited 1 second without seeing runningThreads = 4 (it was " + ThreadingSuiteState.runningThreads.get() + "); failing test") fail("One or more threads didn't see runningThreads = 4") } - throwable.foreach { t => throw t } } test("set local properties in different thread") { @@ -178,8 +178,8 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { threads.foreach(_.start()) sem.acquire(5) - assert(sc.getLocalProperty("test") === null) throwable.foreach { t => throw t } + assert(sc.getLocalProperty("test") === null) } test("set and get local properties in parent-children thread") { @@ -207,15 +207,16 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { threads.foreach(_.start()) sem.acquire(5) + throwable.foreach { t => throw t } assert(sc.getLocalProperty("test") === "parent") assert(sc.getLocalProperty("Foo") === null) - throwable.foreach { t => throw t } } test("mutations to local properties should not affect submitted jobs (SPARK-6629)") { val jobStarted = new Semaphore(0) val jobEnded = new Semaphore(0) @volatile var jobResult: JobResult = null + var throwable: Option[Throwable] = None sc = new SparkContext("local", "test") sc.setJobGroup("originalJobGroupId", "description") @@ -232,14 +233,19 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { // Create a new thread which will inherit the current thread's properties val thread = new Thread() { override def run(): Unit = { - assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId") - // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task try { - sc.parallelize(1 to 100).foreach { x => - Thread.sleep(100) + assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId") + // Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task + try { + sc.parallelize(1 to 100).foreach { x => + Thread.sleep(100) + } + } catch { + case s: SparkException => // ignored so that we don't print noise in test logs } } catch { - case s: SparkException => // ignored so that we don't print noise in test logs + case t: Throwable => + throwable = Some(t) } } } @@ -252,6 +258,7 @@ class ThreadingSuite extends SparkFunSuite with LocalSparkContext with Logging { // modification of the properties object should not affect the properties of running jobs sc.cancelJobGroup("originalJobGroupId") jobEnded.tryAcquire(10, TimeUnit.SECONDS) + throwable.foreach { t => throw t } assert(jobResult.isInstanceOf[JobFailed]) } }