Skip to content

Commit

Permalink
[SPARK-39385][SQL] Translate linear regression aggregate functions fo…
Browse files Browse the repository at this point in the history
…r pushdown

### What changes were proposed in this pull request?
Spark supports a lot of linear regression aggregate functions now.
Because `REGR_AVGX`, `REGR_AVGY`, `REGR_COUNT`, `REGR_SXX` and `REGR_SXY` are replaced to other expression in runtime, This PR will only translate `REGR_INTERCEPT`, `REGR_R2`, `REGR_SLOPE`, `REGR_SXY` for pushdown.

After this job, users could override `JdbcDialect.compileAggregate` to implement some linear regression aggregate functions supported by some database.

### Why are the changes needed?
Make the implement of *Dialect could compile `REGR_INTERCEPT`, `REGR_R2`, `REGR_SLOPE`, `REGR_SXY`.

### Does this PR introduce _any_ user-facing change?
'No'.
New feature.

### How was this patch tested?
New tests.

Closes #36773 from beliefer/SPARK-39385.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
a0x8o committed Jul 7, 2022
1 parent f3ab1ea commit 159e874
Show file tree
Hide file tree
Showing 206 changed files with 6,666 additions and 3,809 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/WindowSpec.R
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ setMethod("orderBy",
#' An offset indicates the number of rows above or below the current row, the frame for the
#' current row starts or ends. For instance, given a row based sliding frame with a lower bound
#' offset of -1 and a upper bound offset of +2. The frame for row with index 5 would range from
#' index 4 to index 6.
#' index 4 to index 7.
#'
#' @param x a WindowSpec
#' @param start boundary start, inclusive.
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -4015,8 +4015,8 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
expect_error(setCurrentDatabase("zxwtyswklpf"),
paste0("Error in setCurrentDatabase : analysis error - Database ",
"'zxwtyswklpf' does not exist"))
paste0("Error in setCurrentDatabase : no such database - Database ",
"'zxwtyswklpf' not found"))
dbs <- collect(listDatabases())
expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
expect_equal(which(dbs[, 1] == "default"), 1)
Expand Down Expand Up @@ -4050,12 +4050,12 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", {
f <- listFunctions()
expect_true(nrow(f) >= 200) # 250
expect_equal(colnames(f),
c("name", "database", "description", "className", "isTemporary"))
c("name", "catalog", "namespace", "description", "className", "isTemporary"))
expect_equal(take(orderBy(f, "className"), 1)$className,
"org.apache.spark.sql.catalyst.expressions.Abs")
expect_error(listFunctions("zxwtyswklpf_db"),
paste("Error in listFunctions : analysis error - Database",
"'zxwtyswklpf_db' does not exist"))
paste("Error in listFunctions : no such database - Database",
"'zxwtyswklpf_db' not found"))

# recoverPartitions does not work with temporary view
expect_error(recoverPartitions("cars"),
Expand Down
4 changes: 2 additions & 2 deletions R/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then
fi

if [ -z "$SPARK_JARS" ]; then
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configurationFile=file:$FWDIR/log4j2.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configurationFile=file:$FWDIR/log4j2.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Xss4M" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Xss4M" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
else
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configurationFile=file:$FWDIR/log4j2.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configurationFile=file:$FWDIR/log4j2.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Xss4M" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true -Xss4M" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
fi

FAILED=$((PIPESTATUS[0]||$FAILED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCTest

override def supportsIndex: Boolean = true

override def supportListIndexes: Boolean = true

override def indexOptions: String = "KEY_BLOCK_SIZE=10"

testVarPop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def supportsIndex: Boolean = false

def supportListIndexes: Boolean = false

def indexOptions: String = ""

test("SPARK-36895: Test INDEX Using SQL") {
Expand All @@ -219,11 +221,21 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
s" The supported Index Types are:"))

sql(s"CREATE index i1 ON $catalogName.new_table USING BTREE (col1)")
assert(jdbcTable.indexExists("i1"))
if (supportListIndexes) {
val indexes = jdbcTable.listIndexes()
assert(indexes.size == 1)
assert(indexes.head.indexName() == "i1")
}

sql(s"CREATE index i2 ON $catalogName.new_table (col2, col3, col5)" +
s" OPTIONS ($indexOptions)")

assert(jdbcTable.indexExists("i1") == true)
assert(jdbcTable.indexExists("i2") == true)
assert(jdbcTable.indexExists("i2"))
if (supportListIndexes) {
val indexes = jdbcTable.listIndexes()
assert(indexes.size == 2)
assert(indexes.map(_.indexName()).sorted === Array("i1", "i2"))
}

// This should pass without exception
sql(s"CREATE index IF NOT EXISTS i1 ON $catalogName.new_table (col1)")
Expand All @@ -234,10 +246,18 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
assert(m.contains("Failed to create index i1 in new_table"))

sql(s"DROP index i1 ON $catalogName.new_table")
sql(s"DROP index i2 ON $catalogName.new_table")

assert(jdbcTable.indexExists("i1") == false)
if (supportListIndexes) {
val indexes = jdbcTable.listIndexes()
assert(indexes.size == 1)
assert(indexes.head.indexName() == "i2")
}

sql(s"DROP index i2 ON $catalogName.new_table")
assert(jdbcTable.indexExists("i2") == false)
if (supportListIndexes) {
assert(jdbcTable.listIndexes().isEmpty)
}

// This should pass without exception
sql(s"DROP index IF EXISTS i1 ON $catalogName.new_table")
Expand Down
13 changes: 13 additions & 0 deletions core/benchmarks/MapStatusesConvertBenchmark-jdk11-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
================================================================================================
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 11.0.15+10-LTS on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1324 1333 7 0.0 1324283680.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2650 2670 32 0.0 2650318387.0 0.5X
Num Maps: 50000 Fetch partitions:1500 4018 4059 53 0.0 4017921009.0 0.3X


13 changes: 13 additions & 0 deletions core/benchmarks/MapStatusesConvertBenchmark-jdk17-results.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
================================================================================================
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 17.0.3+7-LTS on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1092 1104 22 0.0 1091691925.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2172 2192 29 0.0 2171702137.0 0.5X
Num Maps: 50000 Fetch partitions:1500 3268 3291 27 0.0 3267904436.0 0.3X


10 changes: 5 additions & 5 deletions core/benchmarks/MapStatusesConvertBenchmark-results.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
MapStatuses Convert Benchmark
================================================================================================

OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1025-azure
Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz
OpenJDK 64-Bit Server VM 1.8.0_332-b09 on Linux 5.13.0-1031-azure
Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz
MapStatuses Convert: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
Num Maps: 50000 Fetch partitions:500 1330 1359 26 0.0 1329827185.0 1.0X
Num Maps: 50000 Fetch partitions:1000 2648 2666 20 0.0 2647944453.0 0.5X
Num Maps: 50000 Fetch partitions:1500 4155 4436 383 0.0 4154563448.0 0.3X
Num Maps: 50000 Fetch partitions:500 1001 1033 36 0.0 1000638934.0 1.0X
Num Maps: 50000 Fetch partitions:1000 1699 1705 7 0.0 1699358972.0 0.6X
Num Maps: 50000 Fetch partitions:1500 2647 2855 314 0.0 2646904255.0 0.4X


28 changes: 16 additions & 12 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,6 @@ class SparkContext(config: SparkConf) extends Logging {
| stop() method to be called. |
* ------------------------------------------------------------------------------------- */

private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
}

/** Control our logLevel. This overrides any user-defined log settings.
* @param logLevel The desired log level as a string.
* Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN
Expand Down Expand Up @@ -530,12 +524,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

_executorMemory = _conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
_executorMemory = SparkContext.executorMemoryInMb(_conf)

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
Expand Down Expand Up @@ -2890,6 +2879,21 @@ object SparkContext extends Logging {
}
}

private[spark] def executorMemoryInMb(conf: SparkConf): Int = {
conf.getOption(EXECUTOR_MEMORY.key)
.orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(1024)
}

private def warnSparkMem(value: String): String = {
logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
"deprecated, please use spark.executor.memory instead.")
value
}

/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class SparkEnv (
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata =
CacheBuilder.newBuilder().softValues().build[String, AnyRef]().asMap()
CacheBuilder.newBuilder().maximumSize(1000).softValues().build[String, AnyRef]().asMap()

private[spark] var driverTmpDir: Option[String] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ private[spark] class PythonAccumulatorV2(
@transient private val serverHost: String,
private val serverPort: Int,
private val secretToken: String)
extends CollectionAccumulator[Array[Byte]] with Logging{
extends CollectionAccumulator[Array[Byte]] with Logging {

Utils.checkHost(serverHost)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] object SerDeUtil extends Logging {
// This should be called before trying to unpickle array.array from Python
// In cluster mode, this should be put in closure
def initialize(): Unit = {
synchronized{
synchronized {
if (!initialized) {
Unpickler.registerConstructor("__builtin__", "bytearray", new ByteArrayConstructor())
Unpickler.registerConstructor("builtins", "bytearray", new ByteArrayConstructor())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,28 @@ package org.apache.spark.deploy

import java.net.URI

import org.apache.spark.resource.ResourceRequirement
import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources

private[spark] case class ApplicationDescription(
name: String,
maxCores: Option[Int],
memoryPerExecutorMB: Int,
command: Command,
appUiUrl: String,
defaultProfile: ResourceProfile,
eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
eventLogCodec: Option[String] = None,
coresPerExecutor: Option[Int] = None,
// number of executors this application wants to start with,
// only used if dynamic allocation is enabled
initialExecutorLimit: Option[Int] = None,
user: String = System.getProperty("user.name", "<unknown>"),
resourceReqsPerExecutor: Seq[ResourceRequirement] = Seq.empty) {
user: String = System.getProperty("user.name", "<unknown>")) {

def memoryPerExecutorMB: Int = defaultProfile.getExecutorMemory.map(_.toInt).getOrElse(1024)
def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
def resourceReqsPerExecutor: Seq[ResourceRequirement] =
ResourceUtils.executorResourceRequestToRequirement(
getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))

override def toString: String = "ApplicationDescription(" + name + ")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.deploy.master.DriverState.DriverState
import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -166,6 +166,7 @@ private[deploy] object DeployMessages {
masterUrl: String,
appId: String,
execId: Int,
rpId: Int,
appDesc: ApplicationDescription,
cores: Int,
memory: Int,
Expand Down Expand Up @@ -196,7 +197,7 @@ private[deploy] object DeployMessages {

case class MasterChangeAcknowledged(appId: String)

case class RequestExecutors(appId: String, requestedTotal: Int)
case class RequestExecutors(appId: String, resourceProfileToTotalExecs: Map[ResourceProfile, Int])

case class KillExecutors(appId: String, executorIds: Seq[String])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ package org.apache.spark.deploy
private[deploy] class ExecutorDescription(
val appId: String,
val execId: Int,
val rpId: Int,
val cores: Int,
val memoryMb: Int,
val state: ExecutorState.Value)
extends Serializable {

override def toString: String =
"ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
"ExecutorState(appId=%s, execId=%d, rpId=%d, cores=%d, memoryMb=%d state=%s)"
.format(appId, execId, rpId, cores, memoryMb, state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.internal.Logging
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.rpc._
import org.apache.spark.scheduler.ExecutorDecommissionInfo
import org.apache.spark.util.{RpcUtils, ThreadUtils}
Expand Down Expand Up @@ -294,14 +295,25 @@ private[spark] class StandaloneAppClient(
}

/**
* Request executors from the Master by specifying the total number desired,
* including existing pending and running executors.
* Request executors for default resource profile from the Master by specifying the
* total number desired, including existing pending and running executors.
*
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(requestedTotal: Int): Future[Boolean] = {
requestTotalExecutors(Map(appDescription.defaultProfile -> requestedTotal))
}

/**
* Request executors from the Master by specifying the total number desired for each
* resource profile, including existing pending and running executors.
*
* @return whether the request is acknowledged.
*/
def requestTotalExecutors(
resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Future[Boolean] = {
if (endpoint.get != null && appId.get != null) {
endpoint.get.ask[Boolean](RequestExecutors(appId.get, requestedTotal))
endpoint.get.ask[Boolean](RequestExecutors(appId.get, resourceProfileToTotalExecs))
} else {
logWarning("Attempted to request executors before driver fully initialized.")
Future.successful(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.io.{Codec, Source}
import scala.util.control.NonFatal

import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods.parse

import org.apache.spark.deploy.history.EventFilter.FilterStatistics
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -81,7 +80,7 @@ private[spark] object EventFilter extends Logging {
lines.zipWithIndex.foreach { case (line, lineNum) =>
try {
val event = try {
Some(JsonProtocol.sparkEventFromJson(parse(line)))
Some(JsonProtocol.sparkEventFromJson(line))
} catch {
// ignore any exception occurred from unidentified json
case NonFatal(_) =>
Expand Down
Loading

0 comments on commit 159e874

Please sign in to comment.