From 4fa25d161c96163a42b24b9ee74677b792531fb2 Mon Sep 17 00:00:00 2001 From: Alex Date: Mon, 11 Jul 2022 10:02:11 -0500 Subject: [PATCH] [SPARK-39667][SQL] Add another workaround when there is not enough memory to build and broadcast the table ### What changes were proposed in this pull request? This PR adds `ANALYZE TABLE tbl COMPUTE STATISTICS;` as a workaround when there is not enough memory to build and broadcast the table. ### Why are the changes needed? The current workaround has the following disadvantages: 1. Setting `spark.sql.autoBroadcastJoinThreshold` to -1 will disable all broadcast joins. 2. We don't know the specific value of `spark.driver.memory` should be set. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual testing: ``` Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value or analyze these tables through: ANALYZE TABLE `tpds5t`.`item` COMPUTE STATISTICS;." ``` Closes #37069 from wangyum/SPARK-39667. Authored-by: Yuming Wang Signed-off-by: Yuming Wang --- R/pkg/NAMESPACE | 4 + R/pkg/R/catalog.R | 159 +++++++++++++++++- R/pkg/R/functions.R | 6 +- R/pkg/pkgdown/_pkgdown_template.yml | 4 + R/pkg/tests/fulltests/test_sparkSQL.R | 80 ++++++++- .../apache/spark/util/kvstore/KVTypeInfo.java | 2 - .../spark/network/crypto/TransportCipher.java | 21 ++- .../apache/spark/api/python/PythonRDD.scala | 2 +- .../serializer/SerializationDebugger.scala | 8 +- .../shuffle/sort/SortShuffleWriter.scala | 4 +- .../apache/spark/storage/BlockManager.scala | 2 +- .../storage/BlockManagerDecommissioner.scala | 2 +- .../org/apache/spark/util/SizeEstimator.scala | 2 +- dev/appveyor-install-dependencies.ps1 | 2 +- dev/deps/spark-deps-hadoop-2-hive-2.3 | 4 +- dev/deps/spark-deps-hadoop-3-hive-2.3 | 6 +- dev/infra/Dockerfile | 3 + pom.xml | 4 +- project/build.properties | 2 +- python/pyspark/sql/functions.py | 10 +- python/pyspark/sql/tests/test_functions.py | 56 ++++++ .../spark/repl/ExecutorClassLoader.scala | 8 +- .../aggregate/linearRegression.scala | 4 +- .../sql/errors/QueryExecutionErrors.scala | 13 +- .../org/apache/spark/sql/api/r/SQLUtils.scala | 2 +- .../exchange/BroadcastExchangeExec.scala | 5 +- .../org/apache/spark/sql/functions.scala | 3 +- .../spark/sql/CharVarcharTestSuite.scala | 25 ++- 28 files changed, 367 insertions(+), 76 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 570f721ab4..3937791421 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -476,8 +476,11 @@ export("as.DataFrame", "createTable", "currentCatalog", "currentDatabase", + "databaseExists", "dropTempTable", "dropTempView", + "getDatabase", + "getTable", "listCatalogs", "listColumns", "listDatabases", @@ -503,6 +506,7 @@ export("as.DataFrame", "spark.getSparkFiles", "sql", "str", + "tableExists", "tableToDF", "tableNames", "tables", diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R index b10f73fb34..680415ea6c 100644 --- a/R/pkg/R/catalog.R +++ b/R/pkg/R/catalog.R @@ -118,6 +118,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema = #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @param path (optional) the path of files to load. #' @param source (optional) the name of the data source. #' @param schema (optional) the schema of the data required for some data sources. @@ -129,7 +130,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema = #' sparkR.session() #' df <- createTable("myjson", path="path/to/json", source="json", schema) #' -#' createTable("people", source = "json", schema = schema) +#' createTable("spark_catalog.default.people", source = "json", schema = schema) #' insertInto(df, "people") #' } #' @name createTable @@ -160,6 +161,7 @@ createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, .. #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @return SparkDataFrame #' @rdname cacheTable #' @examples @@ -184,6 +186,7 @@ cacheTable <- function(tableName) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @return SparkDataFrame #' @rdname uncacheTable #' @examples @@ -275,13 +278,14 @@ dropTempView <- function(viewName) { #' Returns a SparkDataFrame containing names of tables in the given database. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a SparkDataFrame #' @rdname tables #' @seealso \link{listTables} #' @examples #'\dontrun{ #' sparkR.session() -#' tables("hive") +#' tables("spark_catalog.hive") #' } #' @name tables #' @note tables since 1.4.0 @@ -295,12 +299,13 @@ tables <- function(databaseName = NULL) { #' Returns the names of tables in the given database as an array. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a list of table names #' @rdname tableNames #' @examples #'\dontrun{ #' sparkR.session() -#' tableNames("hive") +#' tableNames("spark_catalog.hive") #' } #' @name tableNames #' @note tableNames since 1.4.0 @@ -353,6 +358,28 @@ setCurrentDatabase <- function(databaseName) { invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName)) } +#' Checks if the database with the specified name exists. +#' +#' Checks if the database with the specified name exists. +#' +#' @param databaseName name of the database, allowed to be qualified with catalog name +#' @rdname databaseExists +#' @name databaseExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' databaseExists("spark_catalog.default") +#' } +#' @note since 3.4.0 +databaseExists <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "databaseExists", databaseName) +} + #' Returns a list of databases available #' #' Returns a list of databases available. @@ -372,12 +399,54 @@ listDatabases <- function() { dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF")) } +#' Get the database with the specified name +#' +#' Get the database with the specified name +#' +#' @param databaseName name of the database, allowed to be qualified with catalog name +#' @return A named list. +#' @rdname getDatabase +#' @name getDatabase +#' @examples +#' \dontrun{ +#' sparkR.session() +#' db <- getDatabase("default") +#' } +#' @note since 3.4.0 +getDatabase <- function(databaseName) { + sparkSession <- getSparkSession() + if (class(databaseName) != "character") { + stop("databaseName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jdb <- handledCallJMethod(catalog, "getDatabase", databaseName) + + ret <- list(name = callJMethod(jdb, "name")) + jcata <- callJMethod(jdb, "catalog") + if (is.null(jcata)) { + ret$catalog <- NA + } else { + ret$catalog <- jcata + } + + jdesc <- callJMethod(jdb, "description") + if (is.null(jdesc)) { + ret$description <- NA + } else { + ret$description <- jdesc + } + + ret$locationUri <- callJMethod(jdb, "locationUri") + ret +} + #' Returns a list of tables or views in the specified database #' #' Returns a list of tables or views in the specified database. #' This includes all temporary views. #' #' @param databaseName (optional) name of the database +#' The database name can be qualified with catalog name since 3.4.0. #' @return a SparkDataFrame of the list of tables. #' @rdname listTables #' @name listTables @@ -386,7 +455,7 @@ listDatabases <- function() { #' \dontrun{ #' sparkR.session() #' listTables() -#' listTables("default") +#' listTables("spark_catalog.default") #' } #' @note since 2.2.0 listTables <- function(databaseName = NULL) { @@ -403,6 +472,78 @@ listTables <- function(databaseName = NULL) { dataFrame(callJMethod(jdst, "toDF")) } +#' Checks if the table with the specified name exists. +#' +#' Checks if the table with the specified name exists. +#' +#' @param tableName name of the table, allowed to be qualified with catalog name +#' @rdname tableExists +#' @name tableExists +#' @examples +#' \dontrun{ +#' sparkR.session() +#' databaseExists("spark_catalog.default.myTable") +#' } +#' @note since 3.4.0 +tableExists <- function(tableName) { + sparkSession <- getSparkSession() + if (class(tableName) != "character") { + stop("tableName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + callJMethod(catalog, "tableExists", tableName) +} + +#' Get the table with the specified name +#' +#' Get the table with the specified name +#' +#' @param tableName the qualified or unqualified name that designates a table, allowed to be +#' qualified with catalog name +#' @return A named list. +#' @rdname getTable +#' @name getTable +#' @examples +#' \dontrun{ +#' sparkR.session() +#' tbl <- getTable("spark_catalog.default.myTable") +#' } +#' @note since 3.4.0 +getTable <- function(tableName) { + sparkSession <- getSparkSession() + if (class(tableName) != "character") { + stop("tableName must be a string.") + } + catalog <- callJMethod(sparkSession, "catalog") + jtbl <- handledCallJMethod(catalog, "getTable", tableName) + + ret <- list(name = callJMethod(jtbl, "name")) + jcata <- callJMethod(jtbl, "catalog") + if (is.null(jcata)) { + ret$catalog <- NA + } else { + ret$catalog <- jcata + } + + jns <- callJMethod(jtbl, "namespace") + if (is.null(jns)) { + ret$namespace <- NA + } else { + ret$namespace <- jns + } + + jdesc <- callJMethod(jtbl, "description") + if (is.null(jdesc)) { + ret$description <- NA + } else { + ret$description <- jdesc + } + + ret$tableType <- callJMethod(jtbl, "tableType") + ret$isTemporary <- callJMethod(jtbl, "isTemporary") + ret +} + #' Returns a list of columns for the given table/view in the specified database #' #' Returns a list of columns for the given table/view in the specified database. @@ -410,6 +551,8 @@ listTables <- function(databaseName = NULL) { #' @param tableName the qualified or unqualified name that designates a table/view. If no database #' identifier is provided, it refers to a table/view in the current database. #' If \code{databaseName} parameter is specified, this must be an unqualified name. +#' The table name can be qualified with catalog name since 3.4.0, when databaseName +#' is NULL. #' @param databaseName (optional) name of the database #' @return a SparkDataFrame of the list of column descriptions. #' @rdname listColumns @@ -417,7 +560,7 @@ listTables <- function(databaseName = NULL) { #' @examples #' \dontrun{ #' sparkR.session() -#' listColumns("mytable") +#' listColumns("spark_catalog.default.mytable") #' } #' @note since 2.2.0 listColumns <- function(tableName, databaseName = NULL) { @@ -470,12 +613,13 @@ listFunctions <- function(databaseName = NULL) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @rdname recoverPartitions #' @name recoverPartitions #' @examples #' \dontrun{ #' sparkR.session() -#' recoverPartitions("myTable") +#' recoverPartitions("spark_catalog.default.myTable") #' } #' @note since 2.2.0 recoverPartitions <- function(tableName) { @@ -496,12 +640,13 @@ recoverPartitions <- function(tableName) { #' #' @param tableName the qualified or unqualified name that designates a table. If no database #' identifier is provided, it refers to a table in the current database. +#' The table name can be fully qualified with catalog name since 3.4.0. #' @rdname refreshTable #' @name refreshTable #' @examples #' \dontrun{ #' sparkR.session() -#' refreshTable("myTable") +#' refreshTable("spark_catalog.default.myTable") #' } #' @note since 2.2.0 refreshTable <- function(tableName) { diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 1377f0daa7..d772c9bd4e 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -3256,7 +3256,8 @@ setMethod("format_string", signature(format = "character", x = "Column"), #' tmp <- mutate(df, to_unix = unix_timestamp(df$time), #' to_unix2 = unix_timestamp(df$time, 'yyyy-MM-dd HH'), #' from_unix = from_unixtime(unix_timestamp(df$time)), -#' from_unix2 = from_unixtime(unix_timestamp(df$time), 'yyyy-MM-dd HH:mm')) +#' from_unix2 = from_unixtime(unix_timestamp(df$time), 'yyyy-MM-dd HH:mm'), +#' timestamp_from_unix = timestamp_seconds(unix_timestamp(df$time))) #' head(tmp)} #' @note from_unixtime since 1.5.0 setMethod("from_unixtime", signature(x = "Column"), @@ -4854,7 +4855,8 @@ setMethod("current_timestamp", }) #' @details -#' \code{timestamp_seconds}: Creates timestamp from the number of seconds since UTC epoch. +#' \code{timestamp_seconds}: Converts the number of seconds from the Unix epoch +#' (1970-01-01T00:00:00Z) to a timestamp. #' #' @rdname column_datetime_functions #' @aliases timestamp_seconds timestamp_seconds,Column-method diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml index d487b51ec5..df93f200ab 100644 --- a/R/pkg/pkgdown/_pkgdown_template.yml +++ b/R/pkg/pkgdown/_pkgdown_template.yml @@ -263,8 +263,11 @@ reference: - contents: - currentCatalog - currentDatabase + - databaseExists - dropTempTable - dropTempView + - getDatabase + - getTable - listCatalogs - listColumns - listDatabases @@ -275,6 +278,7 @@ reference: - recoverPartitions - setCurrentCatalog - setCurrentDatabase + - tableExists - tableNames - tables - uncacheTable diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 9586d8b45a..85eca6b510 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -673,6 +673,22 @@ test_that("test tableNames and tables", { tables <- listTables() expect_equal(count(tables), count + 0) + + count2 <- count(listTables()) + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("people", source = "json", schema = schema) + + expect_equal(length(tableNames()), count2 + 1) + expect_equal(length(tableNames("default")), count2 + 1) + expect_equal(length(tableNames("spark_catalog.default")), count2 + 1) + + tables <- listTables() + expect_equal(count(tables), count2 + 1) + expect_equal(count(tables()), count(tables)) + expect_equal(count(tables("default")), count2 + 1) + expect_equal(count(tables("spark_catalog.default")), count2 + 1) + sql("DROP TABLE IF EXISTS people") }) test_that( @@ -696,16 +712,27 @@ test_that( expect_true(dropTempView("dfView")) }) -test_that("test cache, uncache and clearCache", { - df <- read.json(jsonPath) - createOrReplaceTempView(df, "table1") - cacheTable("table1") - uncacheTable("table1") +test_that("test tableExists, cache, uncache and clearCache", { + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("table1", source = "json", schema = schema) + + cacheTable("default.table1") + uncacheTable("spark_catalog.default.table1") clearCache() - expect_true(dropTempView("table1")) expect_error(uncacheTable("zxwtyswklpf"), "Error in uncacheTable : analysis error - Table or view not found: zxwtyswklpf") + + expect_true(tableExists("table1")) + expect_true(tableExists("default.table1")) + expect_true(tableExists("spark_catalog.default.table1")) + + sql("DROP TABLE IF EXISTS spark_catalog.default.table1") + + expect_false(tableExists("table1")) + expect_false(tableExists("default.table1")) + expect_false(tableExists("spark_catalog.default.table1")) }) test_that("insertInto() on a registered table", { @@ -1342,7 +1369,7 @@ test_that("test HiveContext", { schema <- structType(structField("name", "string"), structField("age", "integer"), structField("height", "float")) - createTable("people", source = "json", schema = schema) + createTable("spark_catalog.default.people", source = "json", schema = schema) df <- read.df(jsonPathNa, "json", schema) insertInto(df, "people") expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16)) @@ -3411,6 +3438,8 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", { "Length of type vector should match the number of columns for SparkDataFrame") expect_error(coltypes(df) <- c("environment", "list"), "Only atomic type is supported for column types") + + dropTempView("dfView") }) test_that("Method str()", { @@ -3450,6 +3479,8 @@ test_that("Method str()", { # Test utils:::str expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris))) + + dropTempView("irisView") }) test_that("Histogram", { @@ -4022,20 +4053,32 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", { catalogs <- collect(listCatalogs()) }) -test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { +test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases, getDatabase", { expect_equal(currentDatabase(), "default") expect_error(setCurrentDatabase("default"), NA) expect_error(setCurrentDatabase("zxwtyswklpf"), paste0("Error in setCurrentDatabase : no such database - Database ", "'zxwtyswklpf' not found")) + + expect_true(databaseExists("default")) + expect_true(databaseExists("spark_catalog.default")) + expect_false(databaseExists("some_db")) + expect_false(databaseExists("spark_catalog.some_db")) + dbs <- collect(listDatabases()) expect_equal(names(dbs), c("name", "catalog", "description", "locationUri")) expect_equal(which(dbs[, 1] == "default"), 1) + + db <- getDatabase("spark_catalog.default") + expect_equal(db$name, "default") + expect_equal(db$catalog, "spark_catalog") }) -test_that("catalog APIs, listTables, listColumns, listFunctions", { +test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", { tb <- listTables() count <- count(tables()) + expect_equal(nrow(listTables("default")), count) + expect_equal(nrow(listTables("spark_catalog.default")), count) expect_equal(nrow(tb), count) expect_equal(colnames(tb), c("name", "catalog", "namespace", "description", "tableType", "isTemporary")) @@ -4075,7 +4118,26 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", { expect_error(refreshTable("cars"), NA) expect_error(refreshByPath("/"), NA) + view <- getTable("cars") + expect_equal(view$name, "cars") + expect_equal(view$tableType, "TEMPORARY") + expect_true(view$isTemporary) + dropTempView("cars") + + schema <- structType(structField("name", "string"), structField("age", "integer"), + structField("height", "float")) + createTable("default.people", source = "json", schema = schema) + + tbl <- getTable("spark_catalog.default.people") + expect_equal(tbl$name, "people") + expect_equal(tbl$catalog, "spark_catalog") + expect_equal(length(tbl$namespace), 1) + expect_equal(tbl$namespace[[1]], "default") + expect_equal(tbl$tableType, "MANAGED") + expect_false(tbl$isTemporary) + + sql("DROP TABLE IF EXISTS people") }) test_that("assert_true, raise_error", { diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java index a7e5831846..a15d07cf59 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java @@ -48,7 +48,6 @@ public KVTypeInfo(Class type) { checkIndex(idx, indices); f.setAccessible(true); indices.put(idx.value(), idx); - f.setAccessible(true); accessors.put(idx.value(), new FieldAccessor(f)); } } @@ -61,7 +60,6 @@ public KVTypeInfo(Class type) { "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName()); m.setAccessible(true); indices.put(idx.value(), idx); - m.setAccessible(true); accessors.put(idx.value(), new MethodAccessor(m)); } } diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java index 36ca73f6ac..b507f911fe 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java +++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java @@ -109,13 +109,15 @@ public void addToChannel(Channel ch) throws IOException { @VisibleForTesting static class EncryptionHandler extends ChannelOutboundHandlerAdapter { - private final ByteArrayWritableChannel byteChannel; + private final ByteArrayWritableChannel byteEncChannel; private final CryptoOutputStream cos; + private final ByteArrayWritableChannel byteRawChannel; private boolean isCipherValid; EncryptionHandler(TransportCipher cipher) throws IOException { - byteChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); - cos = cipher.createOutputStream(byteChannel); + byteEncChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); + cos = cipher.createOutputStream(byteEncChannel); + byteRawChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); isCipherValid = true; } @@ -127,7 +129,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @VisibleForTesting EncryptedMessage createEncryptedMessage(Object msg) { - return new EncryptedMessage(this, cos, msg, byteChannel); + return new EncryptedMessage(this, cos, msg, byteEncChannel, byteRawChannel); } @Override @@ -223,8 +225,8 @@ static class EncryptedMessage extends AbstractFileRegion { // Due to streaming issue CRYPTO-125: https://issues.apache.org/jira/browse/CRYPTO-125, it has // to utilize two helper ByteArrayWritableChannel for streaming. One is used to receive raw data // from upper handler, another is used to store encrypted data. - private ByteArrayWritableChannel byteEncChannel; - private ByteArrayWritableChannel byteRawChannel; + private final ByteArrayWritableChannel byteEncChannel; + private final ByteArrayWritableChannel byteRawChannel; private ByteBuffer currentEncrypted; @@ -232,7 +234,8 @@ static class EncryptedMessage extends AbstractFileRegion { EncryptionHandler handler, CryptoOutputStream cos, Object msg, - ByteArrayWritableChannel ch) { + ByteArrayWritableChannel byteEncChannel, + ByteArrayWritableChannel byteRawChannel) { Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion, "Unrecognized message type: %s", msg.getClass().getName()); this.handler = handler; @@ -240,9 +243,9 @@ static class EncryptedMessage extends AbstractFileRegion { this.buf = isByteBuf ? (ByteBuf) msg : null; this.region = isByteBuf ? null : (FileRegion) msg; this.transferred = 0; - this.byteRawChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE); this.cos = cos; - this.byteEncChannel = ch; + this.byteEncChannel = byteEncChannel; + this.byteRawChannel = byteRawChannel; this.count = isByteBuf ? buf.readableBytes() : region.count(); } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index d11d2e7a4f..071ea50e9b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -892,7 +892,7 @@ private[spark] class DechunkedInputStream(wrapped: InputStream) extends InputStr } } assert(destSpace == 0 || remainingInChunk == -1) - return destPos - off + destPos - off } override def close(): Unit = wrapped.close() diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala index 75dc3982ab..2879124902 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala @@ -123,7 +123,7 @@ private[spark] object SerializationDebugger extends Logging { } i += 1 } - return List.empty + List.empty } /** @@ -145,7 +145,7 @@ private[spark] object SerializationDebugger extends Logging { } i += 1 } - return List.empty + List.empty } private def visitSerializable(o: Object, stack: List[String]): List[String] = { @@ -212,7 +212,7 @@ private[spark] object SerializationDebugger extends Logging { } i += 1 } - return List.empty + List.empty } /** @@ -249,7 +249,7 @@ private[spark] object SerializationDebugger extends Logging { } else { visited ++= innerObjectsCatcher.outputArray } - return List.empty + List.empty } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 3cbf30160e..8613fe11a4 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -80,9 +80,9 @@ private[spark] class SortShuffleWriter[K, V, C]( } stopping = true if (success) { - return Option(mapStatus) + Option(mapStatus) } else { - return None + None } } finally { // Clean up our sorter, which may have its own intermediate files diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f4adbc7ccb..28410f8788 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1831,7 +1831,7 @@ private[spark] class BlockManager( } logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}") - return true + true } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala index cb01faf7d4..7bfde9cedc 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala @@ -345,7 +345,7 @@ private[storage] class BlockManagerDecommissioner( s"process: ${blocksFailedReplication.mkString(",")}") return true } - return false + false } private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 55d13801d4..d37a9a09cc 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -156,7 +156,7 @@ object SizeEstimator extends Logging { val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024) val guessInWords = if (guess) "yes" else "not" logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords) - return guess + guess } } diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1 index 0ad7a4f5e5..d57b536038 100644 --- a/dev/appveyor-install-dependencies.ps1 +++ b/dev/appveyor-install-dependencies.ps1 @@ -97,7 +97,7 @@ if (!(Test-Path $tools)) { # ========================== SBT Push-Location $tools -$sbtVer = "1.6.2" +$sbtVer = "1.7.0" Start-FileDownload "https://github.com/sbt/sbt/releases/download/v$sbtVer/sbt-$sbtVer.zip" "sbt.zip" # extract diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3 index e3342935c1..ccc0d60701 100644 --- a/dev/deps/spark-deps-hadoop-2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2-hive-2.3 @@ -145,10 +145,10 @@ jersey-hk2/2.35//jersey-hk2-2.35.jar jersey-server/2.35//jersey-server-2.35.jar jetty-sslengine/6.1.26//jetty-sslengine-6.1.26.jar jetty-util/6.1.26//jetty-util-6.1.26.jar -jetty-util/9.4.46.v20220331//jetty-util-9.4.46.v20220331.jar +jetty-util/9.4.48.v20220622//jetty-util-9.4.48.v20220622.jar jetty/6.1.26//jetty-6.1.26.jar jline/2.14.6//jline-2.14.6.jar -joda-time/2.10.13//joda-time-2.10.13.jar +joda-time/2.10.14//joda-time-2.10.14.jar jodd-core/3.5.2//jodd-core-3.5.2.jar jpam/1.1//jpam-1.1.jar json/1.8//json-1.8.jar diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index a2dfd894af..31ada151e2 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -132,10 +132,10 @@ jersey-container-servlet/2.35//jersey-container-servlet-2.35.jar jersey-hk2/2.35//jersey-hk2-2.35.jar jersey-server/2.35//jersey-server-2.35.jar jettison/1.1//jettison-1.1.jar -jetty-util-ajax/9.4.46.v20220331//jetty-util-ajax-9.4.46.v20220331.jar -jetty-util/9.4.46.v20220331//jetty-util-9.4.46.v20220331.jar +jetty-util-ajax/9.4.48.v20220622//jetty-util-ajax-9.4.48.v20220622.jar +jetty-util/9.4.48.v20220622//jetty-util-9.4.48.v20220622.jar jline/2.14.6//jline-2.14.6.jar -joda-time/2.10.13//joda-time-2.10.13.jar +joda-time/2.10.14//joda-time-2.10.14.jar jodd-core/3.5.2//jodd-core-3.5.2.jar jpam/1.1//jpam-1.1.jar json/1.8//json-1.8.jar diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile index e3ba4f6110..7c46058a28 100644 --- a/dev/infra/Dockerfile +++ b/dev/infra/Dockerfile @@ -54,3 +54,6 @@ RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-c RUN apt update RUN $APT_INSTALL r-base libcurl4-openssl-dev qpdf libssl-dev zlib1g-dev RUN Rscript -e "install.packages(c('knitr', 'markdown', 'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow', 'roxygen2', 'xml2'), repos='https://cloud.r-project.org/')" + +# See more in SPARK-39735 +ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library" diff --git a/pom.xml b/pom.xml index 5465ca50e4..1976ea9db9 100644 --- a/pom.xml +++ b/pom.xml @@ -133,7 +133,7 @@ 10.14.2.0 1.12.3 1.7.5 - 9.4.46.v20220331 + 9.4.48.v20220622 4.0.3 0.10.0 2.5.0 @@ -191,7 +191,7 @@ 14.0.1 3.0.16 2.35 - 2.10.13 + 2.10.14 3.5.2 3.0.0 0.12.0 diff --git a/project/build.properties b/project/build.properties index 8599f07ab2..ec1c79b4b0 100644 --- a/project/build.properties +++ b/project/build.properties @@ -15,4 +15,4 @@ # limitations under the License. # # Please update the version in appveyor-install-dependencies.ps1 together. -sbt.version=1.6.2 +sbt.version=1.7.0 diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 3112690cc6..db99dbfc40 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2519,19 +2519,25 @@ def to_utc_timestamp(timestamp: "ColumnOrName", tz: "ColumnOrName") -> Column: def timestamp_seconds(col: "ColumnOrName") -> Column: """ + Converts the number of seconds from the Unix epoch (1970-01-01T00:00:00Z) + to a timestamp. + .. versionadded:: 3.1.0 Examples -------- >>> from pyspark.sql.functions import timestamp_seconds - >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles") + >>> spark.conf.set("spark.sql.session.timeZone", "UTC") >>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time']) >>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).show() +-------------------+ | ts| +-------------------+ - |2008-12-25 07:30:00| + |2008-12-25 15:30:00| +-------------------+ + >>> time_df.select(timestamp_seconds('unix_time').alias('ts')).printSchema() + root + |-- ts: timestamp (nullable = true) >>> spark.conf.unset("spark.sql.session.timeZone") """ diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 5c6acaffa3..5091fa711a 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -16,6 +16,7 @@ # import datetime +from inspect import getmembers, isfunction from itertools import chain import re import math @@ -51,10 +52,65 @@ slice, least, ) +from pyspark.sql import functions from pyspark.testing.sqlutils import ReusedSQLTestCase, SQLTestUtils class FunctionsTests(ReusedSQLTestCase): + def test_function_parity(self): + # This test compares the available list of functions in pyspark.sql.functions with those + # available in the Scala/Java DataFrame API in org.apache.spark.sql.functions. + # + # NOTE FOR DEVELOPERS: + # If this test fails one of the following needs to happen + # * If a function was added to org.apache.spark.sql.functions it either needs to be added to + # pyspark.sql.functions or added to the below expected_missing_in_py set. + # * If a function was added to pyspark.sql.functions that was already in + # org.apache.spark.sql.functions then it needs to be removed from expected_missing_in_py + # below. If the function has a different name it needs to be added to py_equiv_jvm + # mapping. + # * If it's not related to an added/removed function then likely the exclusion list + # jvm_excluded_fn needs to be updated. + + jvm_fn_set = {name for (name, value) in getmembers(self.sc._jvm.functions)} + py_fn_set = {name for (name, value) in getmembers(functions, isfunction) if name[0] != "_"} + + # Functions on the JVM side we do not expect to be available in python because they are + # depreciated, irrelevant to python, or have equivalents. + jvm_excluded_fn = [ + "callUDF", # depreciated, use call_udf + "typedlit", # Scala only + "typedLit", # Scala only + "monotonicallyIncreasingId", # depreciated, use monotonically_increasing_id + "negate", # equivalent to python -expression + "not", # equivalent to python ~expression + "udaf", # used for creating UDAF's which are not supported in PySpark + ] + + jvm_fn_set.difference_update(jvm_excluded_fn) + + # For functions that are named differently in pyspark this is the mapping of their + # python name to the JVM equivalent + py_equiv_jvm = {"create_map": "map"} + for py_name, jvm_name in py_equiv_jvm.items(): + if py_name in py_fn_set: + py_fn_set.remove(py_name) + py_fn_set.add(jvm_name) + + missing_in_py = jvm_fn_set.difference(py_fn_set) + + # Functions that we expect to be missing in python until they are added to pyspark + expected_missing_in_py = { + "call_udf", # TODO(SPARK-39734) + "localtimestamp", # TODO(SPARK-36259) + "map_contains_key", # TODO(SPARK-39733) + "pmod", # TODO(SPARK-37348) + } + + self.assertEqual( + expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected" + ) + def test_explode(self): from pyspark.sql.functions import explode, explode_outer, posexplode_outer diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala index 8cc85391eb..7d4758ec0a 100644 --- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala +++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala @@ -213,7 +213,7 @@ class ExecutorClassLoader( ClassWriter.COMPUTE_FRAMES + ClassWriter.COMPUTE_MAXS) val cleaner = new ConstructorCleaner(name, cw) cr.accept(cleaner, 0) - return cw.toByteArray + cw.toByteArray } else { // Pass the class through unmodified val bos = new ByteArrayOutputStream @@ -227,7 +227,7 @@ class ExecutorClassLoader( done = true } } - return bos.toByteArray + bos.toByteArray } } @@ -257,9 +257,9 @@ extends ClassVisitor(ASM9, cv) { mv.visitInsn(RETURN) mv.visitMaxs(-1, -1) // stack size and local vars will be auto-computed mv.visitEnd() - return null + null } else { - return mv + mv } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala index c371f0b40c..4051898295 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala @@ -278,7 +278,7 @@ case class RegrSlope(left: Expression, right: Expression) extends DeclarativeAgg covarPop.mergeExpressions ++ varPop.mergeExpressions override lazy val evaluateExpression: Expression = { - If(covarPop.n === 0.0, Literal.create(null, DoubleType), covarPop.ck / varPop.m2) + If(varPop.m2 === 0.0, Literal.create(null, DoubleType), covarPop.ck / varPop.m2) } override lazy val inputAggBufferAttributes: Seq[AttributeReference] = @@ -331,7 +331,7 @@ case class RegrIntercept(left: Expression, right: Expression) extends Declarativ covarPop.mergeExpressions ++ varPop.mergeExpressions override lazy val evaluateExpression: Expression = { - If(covarPop.n === 0.0, Literal.create(null, DoubleType), + If(varPop.m2 === 0.0, Literal.create(null, DoubleType), covarPop.yAvg - covarPop.ck / varPop.m2 * covarPop.xAvg) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index bfdf9942a5..a3129f249c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -38,8 +38,8 @@ import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOut import org.apache.spark.executor.CommitDeniedException import org.apache.spark.launcher.SparkLauncher import org.apache.spark.memory.SparkOutOfMemoryError +import org.apache.spark.sql.catalyst.{TableIdentifier, WalkedTypePath} import org.apache.spark.sql.catalyst.ScalaReflection.Schema -import org.apache.spark.sql.catalyst.WalkedTypePath import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} @@ -1755,11 +1755,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { s" ${maxBroadcastTableBytes >> 30}GB: ${dataSize >> 30} GB") } - def notEnoughMemoryToBuildAndBroadcastTableError(oe: OutOfMemoryError): Throwable = { + def notEnoughMemoryToBuildAndBroadcastTableError( + oe: OutOfMemoryError, tables: Seq[TableIdentifier]): Throwable = { + val analyzeTblMsg = if (tables.nonEmpty) { + " or analyze these tables through: " + + s"${tables.map(t => s"ANALYZE TABLE $t COMPUTE STATISTICS;").mkString(" ")}." + } else { + "." + } new OutOfMemoryError("Not enough memory to build and broadcast the table to all " + "worker nodes. As a workaround, you can either disable broadcast by setting " + s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark " + - s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value.") + s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value$analyzeTblMsg") .initCause(oe.getCause) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index f58afcfa05..f505f55c25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -217,7 +217,7 @@ private[sql] object SQLUtils extends Logging { case _ => sparkSession.catalog.currentDatabase } - sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray + sparkSession.catalog.listTables(db).collect().map(_.name) } def createArrayType(column: Column): ArrayType = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 7859785da8..accd0a064e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning} import org.apache.spark.sql.errors.QueryExecutionErrors -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.joins.{HashedRelation, HashedRelationBroadcastMode} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} @@ -179,8 +179,9 @@ case class BroadcastExchangeExec( // SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult // will catch this exception and re-throw the wrapped fatal throwable. case oe: OutOfMemoryError => + val tables = child.collect { case f: FileSourceScanExec => f.tableIdentifier }.flatten val ex = new SparkFatalException( - QueryExecutionErrors.notEnoughMemoryToBuildAndBroadcastTableError(oe)) + QueryExecutionErrors.notEnoughMemoryToBuildAndBroadcastTableError(oe, tables)) promise.tryFailure(ex) throw ex case e if !NonFatal(e) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 814a2e472f..c056baba8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3826,7 +3826,8 @@ object functions { } /** - * Creates timestamp from the number of seconds since UTC epoch. + * Converts the number of seconds from the Unix epoch (1970-01-01T00:00:00Z) + * to a timestamp. * @group datetime_funcs * @since 3.1.0 */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 978e3f8d36..321a838f27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -672,6 +672,18 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { } } } + + test("SPARK-35359: create table and insert data over length values") { + Seq("char", "varchar").foreach { typ => + withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) { + withTable("t") { + sql(s"CREATE TABLE t (col $typ(2)) using $format") + sql("INSERT INTO t SELECT 'aaa'") + checkAnswer(sql("select * from t"), Row("aaa")) + } + } + } + } } // Some basic char/varchar tests which doesn't rely on table implementation. @@ -799,7 +811,6 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa withTable("t") { sql("SELECT '12' as col").write.format(format).save(dir.toString) sql(s"CREATE TABLE t (col $typ(2)) using $format LOCATION '$dir'") - val df = sql("select * from t") checkAnswer(sql("select * from t"), Row("12")) } } @@ -818,18 +829,6 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa } } - test("SPARK-35359: create table and insert data over length values") { - Seq("char", "varchar").foreach { typ => - withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) { - withTable("t") { - sql(s"CREATE TABLE t (col $typ(2)) using $format") - sql("INSERT INTO t SELECT 'aaa'") - checkAnswer(sql("select * from t"), Row("aaa")) - } - } - } - } - test("alter table set location w/ fit length values") { Seq("char", "varchar").foreach { typ => withTempPath { dir =>