diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 570f721ab4..3937791421 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -476,8 +476,11 @@ export("as.DataFrame",
"createTable",
"currentCatalog",
"currentDatabase",
+ "databaseExists",
"dropTempTable",
"dropTempView",
+ "getDatabase",
+ "getTable",
"listCatalogs",
"listColumns",
"listDatabases",
@@ -503,6 +506,7 @@ export("as.DataFrame",
"spark.getSparkFiles",
"sql",
"str",
+ "tableExists",
"tableToDF",
"tableNames",
"tables",
diff --git a/R/pkg/R/catalog.R b/R/pkg/R/catalog.R
index b10f73fb34..680415ea6c 100644
--- a/R/pkg/R/catalog.R
+++ b/R/pkg/R/catalog.R
@@ -118,6 +118,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema =
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
+#' The table name can be fully qualified with catalog name since 3.4.0.
#' @param path (optional) the path of files to load.
#' @param source (optional) the name of the data source.
#' @param schema (optional) the schema of the data required for some data sources.
@@ -129,7 +130,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema =
#' sparkR.session()
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
#'
-#' createTable("people", source = "json", schema = schema)
+#' createTable("spark_catalog.default.people", source = "json", schema = schema)
#' insertInto(df, "people")
#' }
#' @name createTable
@@ -160,6 +161,7 @@ createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ..
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
+#' The table name can be fully qualified with catalog name since 3.4.0.
#' @return SparkDataFrame
#' @rdname cacheTable
#' @examples
@@ -184,6 +186,7 @@ cacheTable <- function(tableName) {
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
+#' The table name can be fully qualified with catalog name since 3.4.0.
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @examples
@@ -275,13 +278,14 @@ dropTempView <- function(viewName) {
#' Returns a SparkDataFrame containing names of tables in the given database.
#'
#' @param databaseName (optional) name of the database
+#' The database name can be qualified with catalog name since 3.4.0.
#' @return a SparkDataFrame
#' @rdname tables
#' @seealso \link{listTables}
#' @examples
#'\dontrun{
#' sparkR.session()
-#' tables("hive")
+#' tables("spark_catalog.hive")
#' }
#' @name tables
#' @note tables since 1.4.0
@@ -295,12 +299,13 @@ tables <- function(databaseName = NULL) {
#' Returns the names of tables in the given database as an array.
#'
#' @param databaseName (optional) name of the database
+#' The database name can be qualified with catalog name since 3.4.0.
#' @return a list of table names
#' @rdname tableNames
#' @examples
#'\dontrun{
#' sparkR.session()
-#' tableNames("hive")
+#' tableNames("spark_catalog.hive")
#' }
#' @name tableNames
#' @note tableNames since 1.4.0
@@ -353,6 +358,28 @@ setCurrentDatabase <- function(databaseName) {
invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
}
+#' Checks if the database with the specified name exists.
+#'
+#' Checks if the database with the specified name exists.
+#'
+#' @param databaseName name of the database, allowed to be qualified with catalog name
+#' @rdname databaseExists
+#' @name databaseExists
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' databaseExists("spark_catalog.default")
+#' }
+#' @note since 3.4.0
+databaseExists <- function(databaseName) {
+ sparkSession <- getSparkSession()
+ if (class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "databaseExists", databaseName)
+}
+
#' Returns a list of databases available
#'
#' Returns a list of databases available.
@@ -372,12 +399,54 @@ listDatabases <- function() {
dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
}
+#' Get the database with the specified name
+#'
+#' Get the database with the specified name
+#'
+#' @param databaseName name of the database, allowed to be qualified with catalog name
+#' @return A named list.
+#' @rdname getDatabase
+#' @name getDatabase
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' db <- getDatabase("default")
+#' }
+#' @note since 3.4.0
+getDatabase <- function(databaseName) {
+ sparkSession <- getSparkSession()
+ if (class(databaseName) != "character") {
+ stop("databaseName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jdb <- handledCallJMethod(catalog, "getDatabase", databaseName)
+
+ ret <- list(name = callJMethod(jdb, "name"))
+ jcata <- callJMethod(jdb, "catalog")
+ if (is.null(jcata)) {
+ ret$catalog <- NA
+ } else {
+ ret$catalog <- jcata
+ }
+
+ jdesc <- callJMethod(jdb, "description")
+ if (is.null(jdesc)) {
+ ret$description <- NA
+ } else {
+ ret$description <- jdesc
+ }
+
+ ret$locationUri <- callJMethod(jdb, "locationUri")
+ ret
+}
+
#' Returns a list of tables or views in the specified database
#'
#' Returns a list of tables or views in the specified database.
#' This includes all temporary views.
#'
#' @param databaseName (optional) name of the database
+#' The database name can be qualified with catalog name since 3.4.0.
#' @return a SparkDataFrame of the list of tables.
#' @rdname listTables
#' @name listTables
@@ -386,7 +455,7 @@ listDatabases <- function() {
#' \dontrun{
#' sparkR.session()
#' listTables()
-#' listTables("default")
+#' listTables("spark_catalog.default")
#' }
#' @note since 2.2.0
listTables <- function(databaseName = NULL) {
@@ -403,6 +472,78 @@ listTables <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}
+#' Checks if the table with the specified name exists.
+#'
+#' Checks if the table with the specified name exists.
+#'
+#' @param tableName name of the table, allowed to be qualified with catalog name
+#' @rdname tableExists
+#' @name tableExists
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' databaseExists("spark_catalog.default.myTable")
+#' }
+#' @note since 3.4.0
+tableExists <- function(tableName) {
+ sparkSession <- getSparkSession()
+ if (class(tableName) != "character") {
+ stop("tableName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ callJMethod(catalog, "tableExists", tableName)
+}
+
+#' Get the table with the specified name
+#'
+#' Get the table with the specified name
+#'
+#' @param tableName the qualified or unqualified name that designates a table, allowed to be
+#' qualified with catalog name
+#' @return A named list.
+#' @rdname getTable
+#' @name getTable
+#' @examples
+#' \dontrun{
+#' sparkR.session()
+#' tbl <- getTable("spark_catalog.default.myTable")
+#' }
+#' @note since 3.4.0
+getTable <- function(tableName) {
+ sparkSession <- getSparkSession()
+ if (class(tableName) != "character") {
+ stop("tableName must be a string.")
+ }
+ catalog <- callJMethod(sparkSession, "catalog")
+ jtbl <- handledCallJMethod(catalog, "getTable", tableName)
+
+ ret <- list(name = callJMethod(jtbl, "name"))
+ jcata <- callJMethod(jtbl, "catalog")
+ if (is.null(jcata)) {
+ ret$catalog <- NA
+ } else {
+ ret$catalog <- jcata
+ }
+
+ jns <- callJMethod(jtbl, "namespace")
+ if (is.null(jns)) {
+ ret$namespace <- NA
+ } else {
+ ret$namespace <- jns
+ }
+
+ jdesc <- callJMethod(jtbl, "description")
+ if (is.null(jdesc)) {
+ ret$description <- NA
+ } else {
+ ret$description <- jdesc
+ }
+
+ ret$tableType <- callJMethod(jtbl, "tableType")
+ ret$isTemporary <- callJMethod(jtbl, "isTemporary")
+ ret
+}
+
#' Returns a list of columns for the given table/view in the specified database
#'
#' Returns a list of columns for the given table/view in the specified database.
@@ -410,6 +551,8 @@ listTables <- function(databaseName = NULL) {
#' @param tableName the qualified or unqualified name that designates a table/view. If no database
#' identifier is provided, it refers to a table/view in the current database.
#' If \code{databaseName} parameter is specified, this must be an unqualified name.
+#' The table name can be qualified with catalog name since 3.4.0, when databaseName
+#' is NULL.
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of column descriptions.
#' @rdname listColumns
@@ -417,7 +560,7 @@ listTables <- function(databaseName = NULL) {
#' @examples
#' \dontrun{
#' sparkR.session()
-#' listColumns("mytable")
+#' listColumns("spark_catalog.default.mytable")
#' }
#' @note since 2.2.0
listColumns <- function(tableName, databaseName = NULL) {
@@ -470,12 +613,13 @@ listFunctions <- function(databaseName = NULL) {
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
+#' The table name can be fully qualified with catalog name since 3.4.0.
#' @rdname recoverPartitions
#' @name recoverPartitions
#' @examples
#' \dontrun{
#' sparkR.session()
-#' recoverPartitions("myTable")
+#' recoverPartitions("spark_catalog.default.myTable")
#' }
#' @note since 2.2.0
recoverPartitions <- function(tableName) {
@@ -496,12 +640,13 @@ recoverPartitions <- function(tableName) {
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
+#' The table name can be fully qualified with catalog name since 3.4.0.
#' @rdname refreshTable
#' @name refreshTable
#' @examples
#' \dontrun{
#' sparkR.session()
-#' refreshTable("myTable")
+#' refreshTable("spark_catalog.default.myTable")
#' }
#' @note since 2.2.0
refreshTable <- function(tableName) {
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 1377f0daa7..d772c9bd4e 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -3256,7 +3256,8 @@ setMethod("format_string", signature(format = "character", x = "Column"),
#' tmp <- mutate(df, to_unix = unix_timestamp(df$time),
#' to_unix2 = unix_timestamp(df$time, 'yyyy-MM-dd HH'),
#' from_unix = from_unixtime(unix_timestamp(df$time)),
-#' from_unix2 = from_unixtime(unix_timestamp(df$time), 'yyyy-MM-dd HH:mm'))
+#' from_unix2 = from_unixtime(unix_timestamp(df$time), 'yyyy-MM-dd HH:mm'),
+#' timestamp_from_unix = timestamp_seconds(unix_timestamp(df$time)))
#' head(tmp)}
#' @note from_unixtime since 1.5.0
setMethod("from_unixtime", signature(x = "Column"),
@@ -4854,7 +4855,8 @@ setMethod("current_timestamp",
})
#' @details
-#' \code{timestamp_seconds}: Creates timestamp from the number of seconds since UTC epoch.
+#' \code{timestamp_seconds}: Converts the number of seconds from the Unix epoch
+#' (1970-01-01T00:00:00Z) to a timestamp.
#'
#' @rdname column_datetime_functions
#' @aliases timestamp_seconds timestamp_seconds,Column-method
diff --git a/R/pkg/pkgdown/_pkgdown_template.yml b/R/pkg/pkgdown/_pkgdown_template.yml
index d487b51ec5..df93f200ab 100644
--- a/R/pkg/pkgdown/_pkgdown_template.yml
+++ b/R/pkg/pkgdown/_pkgdown_template.yml
@@ -263,8 +263,11 @@ reference:
- contents:
- currentCatalog
- currentDatabase
+ - databaseExists
- dropTempTable
- dropTempView
+ - getDatabase
+ - getTable
- listCatalogs
- listColumns
- listDatabases
@@ -275,6 +278,7 @@ reference:
- recoverPartitions
- setCurrentCatalog
- setCurrentDatabase
+ - tableExists
- tableNames
- tables
- uncacheTable
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index 9586d8b45a..85eca6b510 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -673,6 +673,22 @@ test_that("test tableNames and tables", {
tables <- listTables()
expect_equal(count(tables), count + 0)
+
+ count2 <- count(listTables())
+ schema <- structType(structField("name", "string"), structField("age", "integer"),
+ structField("height", "float"))
+ createTable("people", source = "json", schema = schema)
+
+ expect_equal(length(tableNames()), count2 + 1)
+ expect_equal(length(tableNames("default")), count2 + 1)
+ expect_equal(length(tableNames("spark_catalog.default")), count2 + 1)
+
+ tables <- listTables()
+ expect_equal(count(tables), count2 + 1)
+ expect_equal(count(tables()), count(tables))
+ expect_equal(count(tables("default")), count2 + 1)
+ expect_equal(count(tables("spark_catalog.default")), count2 + 1)
+ sql("DROP TABLE IF EXISTS people")
})
test_that(
@@ -696,16 +712,27 @@ test_that(
expect_true(dropTempView("dfView"))
})
-test_that("test cache, uncache and clearCache", {
- df <- read.json(jsonPath)
- createOrReplaceTempView(df, "table1")
- cacheTable("table1")
- uncacheTable("table1")
+test_that("test tableExists, cache, uncache and clearCache", {
+ schema <- structType(structField("name", "string"), structField("age", "integer"),
+ structField("height", "float"))
+ createTable("table1", source = "json", schema = schema)
+
+ cacheTable("default.table1")
+ uncacheTable("spark_catalog.default.table1")
clearCache()
- expect_true(dropTempView("table1"))
expect_error(uncacheTable("zxwtyswklpf"),
"Error in uncacheTable : analysis error - Table or view not found: zxwtyswklpf")
+
+ expect_true(tableExists("table1"))
+ expect_true(tableExists("default.table1"))
+ expect_true(tableExists("spark_catalog.default.table1"))
+
+ sql("DROP TABLE IF EXISTS spark_catalog.default.table1")
+
+ expect_false(tableExists("table1"))
+ expect_false(tableExists("default.table1"))
+ expect_false(tableExists("spark_catalog.default.table1"))
})
test_that("insertInto() on a registered table", {
@@ -1342,7 +1369,7 @@ test_that("test HiveContext", {
schema <- structType(structField("name", "string"), structField("age", "integer"),
structField("height", "float"))
- createTable("people", source = "json", schema = schema)
+ createTable("spark_catalog.default.people", source = "json", schema = schema)
df <- read.df(jsonPathNa, "json", schema)
insertInto(df, "people")
expect_equal(collect(sql("SELECT age from people WHERE name = 'Bob'"))$age, c(16))
@@ -3411,6 +3438,8 @@ test_that("Method coltypes() to get and set R's data types of a DataFrame", {
"Length of type vector should match the number of columns for SparkDataFrame")
expect_error(coltypes(df) <- c("environment", "list"),
"Only atomic type is supported for column types")
+
+ dropTempView("dfView")
})
test_that("Method str()", {
@@ -3450,6 +3479,8 @@ test_that("Method str()", {
# Test utils:::str
expect_equal(capture.output(utils:::str(iris)), capture.output(str(iris)))
+
+ dropTempView("irisView")
})
test_that("Histogram", {
@@ -4022,20 +4053,32 @@ test_that("catalog APIs, listCatalogs, setCurrentCatalog, currentCatalog", {
catalogs <- collect(listCatalogs())
})
-test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", {
+test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases, getDatabase", {
expect_equal(currentDatabase(), "default")
expect_error(setCurrentDatabase("default"), NA)
expect_error(setCurrentDatabase("zxwtyswklpf"),
paste0("Error in setCurrentDatabase : no such database - Database ",
"'zxwtyswklpf' not found"))
+
+ expect_true(databaseExists("default"))
+ expect_true(databaseExists("spark_catalog.default"))
+ expect_false(databaseExists("some_db"))
+ expect_false(databaseExists("spark_catalog.some_db"))
+
dbs <- collect(listDatabases())
expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
expect_equal(which(dbs[, 1] == "default"), 1)
+
+ db <- getDatabase("spark_catalog.default")
+ expect_equal(db$name, "default")
+ expect_equal(db$catalog, "spark_catalog")
})
-test_that("catalog APIs, listTables, listColumns, listFunctions", {
+test_that("catalog APIs, listTables, listColumns, listFunctions, getTable", {
tb <- listTables()
count <- count(tables())
+ expect_equal(nrow(listTables("default")), count)
+ expect_equal(nrow(listTables("spark_catalog.default")), count)
expect_equal(nrow(tb), count)
expect_equal(colnames(tb),
c("name", "catalog", "namespace", "description", "tableType", "isTemporary"))
@@ -4075,7 +4118,26 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", {
expect_error(refreshTable("cars"), NA)
expect_error(refreshByPath("/"), NA)
+ view <- getTable("cars")
+ expect_equal(view$name, "cars")
+ expect_equal(view$tableType, "TEMPORARY")
+ expect_true(view$isTemporary)
+
dropTempView("cars")
+
+ schema <- structType(structField("name", "string"), structField("age", "integer"),
+ structField("height", "float"))
+ createTable("default.people", source = "json", schema = schema)
+
+ tbl <- getTable("spark_catalog.default.people")
+ expect_equal(tbl$name, "people")
+ expect_equal(tbl$catalog, "spark_catalog")
+ expect_equal(length(tbl$namespace), 1)
+ expect_equal(tbl$namespace[[1]], "default")
+ expect_equal(tbl$tableType, "MANAGED")
+ expect_false(tbl$isTemporary)
+
+ sql("DROP TABLE IF EXISTS people")
})
test_that("assert_true, raise_error", {
diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
index a7e5831846..a15d07cf59 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
@@ -48,7 +48,6 @@ public KVTypeInfo(Class> type) {
checkIndex(idx, indices);
f.setAccessible(true);
indices.put(idx.value(), idx);
- f.setAccessible(true);
accessors.put(idx.value(), new FieldAccessor(f));
}
}
@@ -61,7 +60,6 @@ public KVTypeInfo(Class> type) {
"Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
m.setAccessible(true);
indices.put(idx.value(), idx);
- m.setAccessible(true);
accessors.put(idx.value(), new MethodAccessor(m));
}
}
diff --git a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
index 36ca73f6ac..b507f911fe 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/crypto/TransportCipher.java
@@ -109,13 +109,15 @@ public void addToChannel(Channel ch) throws IOException {
@VisibleForTesting
static class EncryptionHandler extends ChannelOutboundHandlerAdapter {
- private final ByteArrayWritableChannel byteChannel;
+ private final ByteArrayWritableChannel byteEncChannel;
private final CryptoOutputStream cos;
+ private final ByteArrayWritableChannel byteRawChannel;
private boolean isCipherValid;
EncryptionHandler(TransportCipher cipher) throws IOException {
- byteChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE);
- cos = cipher.createOutputStream(byteChannel);
+ byteEncChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE);
+ cos = cipher.createOutputStream(byteEncChannel);
+ byteRawChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE);
isCipherValid = true;
}
@@ -127,7 +129,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
@VisibleForTesting
EncryptedMessage createEncryptedMessage(Object msg) {
- return new EncryptedMessage(this, cos, msg, byteChannel);
+ return new EncryptedMessage(this, cos, msg, byteEncChannel, byteRawChannel);
}
@Override
@@ -223,8 +225,8 @@ static class EncryptedMessage extends AbstractFileRegion {
// Due to streaming issue CRYPTO-125: https://issues.apache.org/jira/browse/CRYPTO-125, it has
// to utilize two helper ByteArrayWritableChannel for streaming. One is used to receive raw data
// from upper handler, another is used to store encrypted data.
- private ByteArrayWritableChannel byteEncChannel;
- private ByteArrayWritableChannel byteRawChannel;
+ private final ByteArrayWritableChannel byteEncChannel;
+ private final ByteArrayWritableChannel byteRawChannel;
private ByteBuffer currentEncrypted;
@@ -232,7 +234,8 @@ static class EncryptedMessage extends AbstractFileRegion {
EncryptionHandler handler,
CryptoOutputStream cos,
Object msg,
- ByteArrayWritableChannel ch) {
+ ByteArrayWritableChannel byteEncChannel,
+ ByteArrayWritableChannel byteRawChannel) {
Preconditions.checkArgument(msg instanceof ByteBuf || msg instanceof FileRegion,
"Unrecognized message type: %s", msg.getClass().getName());
this.handler = handler;
@@ -240,9 +243,9 @@ static class EncryptedMessage extends AbstractFileRegion {
this.buf = isByteBuf ? (ByteBuf) msg : null;
this.region = isByteBuf ? null : (FileRegion) msg;
this.transferred = 0;
- this.byteRawChannel = new ByteArrayWritableChannel(STREAM_BUFFER_SIZE);
this.cos = cos;
- this.byteEncChannel = ch;
+ this.byteEncChannel = byteEncChannel;
+ this.byteRawChannel = byteRawChannel;
this.count = isByteBuf ? buf.readableBytes() : region.count();
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index d11d2e7a4f..071ea50e9b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -892,7 +892,7 @@ private[spark] class DechunkedInputStream(wrapped: InputStream) extends InputStr
}
}
assert(destSpace == 0 || remainingInChunk == -1)
- return destPos - off
+ destPos - off
}
override def close(): Unit = wrapped.close()
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
index 75dc3982ab..2879124902 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
@@ -123,7 +123,7 @@ private[spark] object SerializationDebugger extends Logging {
}
i += 1
}
- return List.empty
+ List.empty
}
/**
@@ -145,7 +145,7 @@ private[spark] object SerializationDebugger extends Logging {
}
i += 1
}
- return List.empty
+ List.empty
}
private def visitSerializable(o: Object, stack: List[String]): List[String] = {
@@ -212,7 +212,7 @@ private[spark] object SerializationDebugger extends Logging {
}
i += 1
}
- return List.empty
+ List.empty
}
/**
@@ -249,7 +249,7 @@ private[spark] object SerializationDebugger extends Logging {
} else {
visited ++= innerObjectsCatcher.outputArray
}
- return List.empty
+ List.empty
}
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 3cbf30160e..8613fe11a4 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -80,9 +80,9 @@ private[spark] class SortShuffleWriter[K, V, C](
}
stopping = true
if (success) {
- return Option(mapStatus)
+ Option(mapStatus)
} else {
- return None
+ None
}
} finally {
// Clean up our sorter, which may have its own intermediate files
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f4adbc7ccb..28410f8788 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1831,7 +1831,7 @@ private[spark] class BlockManager(
}
logDebug(s"block $blockId replicated to ${peersReplicatedTo.mkString(", ")}")
- return true
+ true
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
index cb01faf7d4..7bfde9cedc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
@@ -345,7 +345,7 @@ private[storage] class BlockManagerDecommissioner(
s"process: ${blocksFailedReplication.mkString(",")}")
return true
}
- return false
+ false
}
private def migrateBlock(blockToReplicate: ReplicateBlock): Boolean = {
diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
index 55d13801d4..d37a9a09cc 100644
--- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
+++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
@@ -156,7 +156,7 @@ object SizeEstimator extends Logging {
val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
val guessInWords = if (guess) "yes" else "not"
logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
- return guess
+ guess
}
}
diff --git a/dev/appveyor-install-dependencies.ps1 b/dev/appveyor-install-dependencies.ps1
index 0ad7a4f5e5..d57b536038 100644
--- a/dev/appveyor-install-dependencies.ps1
+++ b/dev/appveyor-install-dependencies.ps1
@@ -97,7 +97,7 @@ if (!(Test-Path $tools)) {
# ========================== SBT
Push-Location $tools
-$sbtVer = "1.6.2"
+$sbtVer = "1.7.0"
Start-FileDownload "https://github.com/sbt/sbt/releases/download/v$sbtVer/sbt-$sbtVer.zip" "sbt.zip"
# extract
diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3
index e3342935c1..ccc0d60701 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -145,10 +145,10 @@ jersey-hk2/2.35//jersey-hk2-2.35.jar
jersey-server/2.35//jersey-server-2.35.jar
jetty-sslengine/6.1.26//jetty-sslengine-6.1.26.jar
jetty-util/6.1.26//jetty-util-6.1.26.jar
-jetty-util/9.4.46.v20220331//jetty-util-9.4.46.v20220331.jar
+jetty-util/9.4.48.v20220622//jetty-util-9.4.48.v20220622.jar
jetty/6.1.26//jetty-6.1.26.jar
jline/2.14.6//jline-2.14.6.jar
-joda-time/2.10.13//joda-time-2.10.13.jar
+joda-time/2.10.14//joda-time-2.10.14.jar
jodd-core/3.5.2//jodd-core-3.5.2.jar
jpam/1.1//jpam-1.1.jar
json/1.8//json-1.8.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index a2dfd894af..31ada151e2 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -132,10 +132,10 @@ jersey-container-servlet/2.35//jersey-container-servlet-2.35.jar
jersey-hk2/2.35//jersey-hk2-2.35.jar
jersey-server/2.35//jersey-server-2.35.jar
jettison/1.1//jettison-1.1.jar
-jetty-util-ajax/9.4.46.v20220331//jetty-util-ajax-9.4.46.v20220331.jar
-jetty-util/9.4.46.v20220331//jetty-util-9.4.46.v20220331.jar
+jetty-util-ajax/9.4.48.v20220622//jetty-util-ajax-9.4.48.v20220622.jar
+jetty-util/9.4.48.v20220622//jetty-util-9.4.48.v20220622.jar
jline/2.14.6//jline-2.14.6.jar
-joda-time/2.10.13//joda-time-2.10.13.jar
+joda-time/2.10.14//joda-time-2.10.14.jar
jodd-core/3.5.2//jodd-core-3.5.2.jar
jpam/1.1//jpam-1.1.jar
json/1.8//json-1.8.jar
diff --git a/dev/infra/Dockerfile b/dev/infra/Dockerfile
index e3ba4f6110..7c46058a28 100644
--- a/dev/infra/Dockerfile
+++ b/dev/infra/Dockerfile
@@ -54,3 +54,6 @@ RUN add-apt-repository 'deb https://cloud.r-project.org/bin/linux/ubuntu focal-c
RUN apt update
RUN $APT_INSTALL r-base libcurl4-openssl-dev qpdf libssl-dev zlib1g-dev
RUN Rscript -e "install.packages(c('knitr', 'markdown', 'rmarkdown', 'testthat', 'devtools', 'e1071', 'survival', 'arrow', 'roxygen2', 'xml2'), repos='https://cloud.r-project.org/')"
+
+# See more in SPARK-39735
+ENV R_LIBS_SITE "/usr/local/lib/R/site-library:${R_LIBS_SITE}:/usr/lib/R/library"
diff --git a/pom.xml b/pom.xml
index 5465ca50e4..1976ea9db9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -133,7 +133,7 @@
10.14.2.0
1.12.3
1.7.5
- 9.4.46.v20220331
+ 9.4.48.v20220622
4.0.3
0.10.0
2.5.0
@@ -191,7 +191,7 @@
14.0.1
3.0.16
2.35
- 2.10.13
+ 2.10.14
3.5.2
3.0.0
0.12.0
diff --git a/project/build.properties b/project/build.properties
index 8599f07ab2..ec1c79b4b0 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -15,4 +15,4 @@
# limitations under the License.
#
# Please update the version in appveyor-install-dependencies.ps1 together.
-sbt.version=1.6.2
+sbt.version=1.7.0
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 3112690cc6..db99dbfc40 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -2519,19 +2519,25 @@ def to_utc_timestamp(timestamp: "ColumnOrName", tz: "ColumnOrName") -> Column:
def timestamp_seconds(col: "ColumnOrName") -> Column:
"""
+ Converts the number of seconds from the Unix epoch (1970-01-01T00:00:00Z)
+ to a timestamp.
+
.. versionadded:: 3.1.0
Examples
--------
>>> from pyspark.sql.functions import timestamp_seconds
- >>> spark.conf.set("spark.sql.session.timeZone", "America/Los_Angeles")
+ >>> spark.conf.set("spark.sql.session.timeZone", "UTC")
>>> time_df = spark.createDataFrame([(1230219000,)], ['unix_time'])
>>> time_df.select(timestamp_seconds(time_df.unix_time).alias('ts')).show()
+-------------------+
| ts|
+-------------------+
- |2008-12-25 07:30:00|
+ |2008-12-25 15:30:00|
+-------------------+
+ >>> time_df.select(timestamp_seconds('unix_time').alias('ts')).printSchema()
+ root
+ |-- ts: timestamp (nullable = true)
>>> spark.conf.unset("spark.sql.session.timeZone")
"""
diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py
index 5c6acaffa3..5091fa711a 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -16,6 +16,7 @@
#
import datetime
+from inspect import getmembers, isfunction
from itertools import chain
import re
import math
@@ -51,10 +52,65 @@
slice,
least,
)
+from pyspark.sql import functions
from pyspark.testing.sqlutils import ReusedSQLTestCase, SQLTestUtils
class FunctionsTests(ReusedSQLTestCase):
+ def test_function_parity(self):
+ # This test compares the available list of functions in pyspark.sql.functions with those
+ # available in the Scala/Java DataFrame API in org.apache.spark.sql.functions.
+ #
+ # NOTE FOR DEVELOPERS:
+ # If this test fails one of the following needs to happen
+ # * If a function was added to org.apache.spark.sql.functions it either needs to be added to
+ # pyspark.sql.functions or added to the below expected_missing_in_py set.
+ # * If a function was added to pyspark.sql.functions that was already in
+ # org.apache.spark.sql.functions then it needs to be removed from expected_missing_in_py
+ # below. If the function has a different name it needs to be added to py_equiv_jvm
+ # mapping.
+ # * If it's not related to an added/removed function then likely the exclusion list
+ # jvm_excluded_fn needs to be updated.
+
+ jvm_fn_set = {name for (name, value) in getmembers(self.sc._jvm.functions)}
+ py_fn_set = {name for (name, value) in getmembers(functions, isfunction) if name[0] != "_"}
+
+ # Functions on the JVM side we do not expect to be available in python because they are
+ # depreciated, irrelevant to python, or have equivalents.
+ jvm_excluded_fn = [
+ "callUDF", # depreciated, use call_udf
+ "typedlit", # Scala only
+ "typedLit", # Scala only
+ "monotonicallyIncreasingId", # depreciated, use monotonically_increasing_id
+ "negate", # equivalent to python -expression
+ "not", # equivalent to python ~expression
+ "udaf", # used for creating UDAF's which are not supported in PySpark
+ ]
+
+ jvm_fn_set.difference_update(jvm_excluded_fn)
+
+ # For functions that are named differently in pyspark this is the mapping of their
+ # python name to the JVM equivalent
+ py_equiv_jvm = {"create_map": "map"}
+ for py_name, jvm_name in py_equiv_jvm.items():
+ if py_name in py_fn_set:
+ py_fn_set.remove(py_name)
+ py_fn_set.add(jvm_name)
+
+ missing_in_py = jvm_fn_set.difference(py_fn_set)
+
+ # Functions that we expect to be missing in python until they are added to pyspark
+ expected_missing_in_py = {
+ "call_udf", # TODO(SPARK-39734)
+ "localtimestamp", # TODO(SPARK-36259)
+ "map_contains_key", # TODO(SPARK-39733)
+ "pmod", # TODO(SPARK-37348)
+ }
+
+ self.assertEqual(
+ expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected"
+ )
+
def test_explode(self):
from pyspark.sql.functions import explode, explode_outer, posexplode_outer
diff --git a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
index 8cc85391eb..7d4758ec0a 100644
--- a/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala
@@ -213,7 +213,7 @@ class ExecutorClassLoader(
ClassWriter.COMPUTE_FRAMES + ClassWriter.COMPUTE_MAXS)
val cleaner = new ConstructorCleaner(name, cw)
cr.accept(cleaner, 0)
- return cw.toByteArray
+ cw.toByteArray
} else {
// Pass the class through unmodified
val bos = new ByteArrayOutputStream
@@ -227,7 +227,7 @@ class ExecutorClassLoader(
done = true
}
}
- return bos.toByteArray
+ bos.toByteArray
}
}
@@ -257,9 +257,9 @@ extends ClassVisitor(ASM9, cv) {
mv.visitInsn(RETURN)
mv.visitMaxs(-1, -1) // stack size and local vars will be auto-computed
mv.visitEnd()
- return null
+ null
} else {
- return mv
+ mv
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala
index c371f0b40c..4051898295 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/linearRegression.scala
@@ -278,7 +278,7 @@ case class RegrSlope(left: Expression, right: Expression) extends DeclarativeAgg
covarPop.mergeExpressions ++ varPop.mergeExpressions
override lazy val evaluateExpression: Expression = {
- If(covarPop.n === 0.0, Literal.create(null, DoubleType), covarPop.ck / varPop.m2)
+ If(varPop.m2 === 0.0, Literal.create(null, DoubleType), covarPop.ck / varPop.m2)
}
override lazy val inputAggBufferAttributes: Seq[AttributeReference] =
@@ -331,7 +331,7 @@ case class RegrIntercept(left: Expression, right: Expression) extends Declarativ
covarPop.mergeExpressions ++ varPop.mergeExpressions
override lazy val evaluateExpression: Expression = {
- If(covarPop.n === 0.0, Literal.create(null, DoubleType),
+ If(varPop.m2 === 0.0, Literal.create(null, DoubleType),
covarPop.yAvg - covarPop.ck / varPop.m2 * covarPop.xAvg)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index bfdf9942a5..a3129f249c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -38,8 +38,8 @@ import org.apache.spark.{Partition, SparkArithmeticException, SparkArrayIndexOut
import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.memory.SparkOutOfMemoryError
+import org.apache.spark.sql.catalyst.{TableIdentifier, WalkedTypePath}
import org.apache.spark.sql.catalyst.ScalaReflection.Schema
-import org.apache.spark.sql.catalyst.WalkedTypePath
import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression}
@@ -1755,11 +1755,18 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase {
s" ${maxBroadcastTableBytes >> 30}GB: ${dataSize >> 30} GB")
}
- def notEnoughMemoryToBuildAndBroadcastTableError(oe: OutOfMemoryError): Throwable = {
+ def notEnoughMemoryToBuildAndBroadcastTableError(
+ oe: OutOfMemoryError, tables: Seq[TableIdentifier]): Throwable = {
+ val analyzeTblMsg = if (tables.nonEmpty) {
+ " or analyze these tables through: " +
+ s"${tables.map(t => s"ANALYZE TABLE $t COMPUTE STATISTICS;").mkString(" ")}."
+ } else {
+ "."
+ }
new OutOfMemoryError("Not enough memory to build and broadcast the table to all " +
"worker nodes. As a workaround, you can either disable broadcast by setting " +
s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 or increase the spark " +
- s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value.")
+ s"driver memory by setting ${SparkLauncher.DRIVER_MEMORY} to a higher value$analyzeTblMsg")
.initCause(oe.getCause)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index f58afcfa05..f505f55c25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -217,7 +217,7 @@ private[sql] object SQLUtils extends Logging {
case _ =>
sparkSession.catalog.currentDatabase
}
- sparkSession.sessionState.catalog.listTables(db).map(_.table).toArray
+ sparkSession.catalog.listTables(db).collect().map(_.name)
}
def createArrayType(column: Column): ArrayType = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 7859785da8..accd0a064e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, BroadcastPartitioning, Partitioning}
import org.apache.spark.sql.errors.QueryExecutionErrors
-import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.joins.{HashedRelation, HashedRelationBroadcastMode}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -179,8 +179,9 @@ case class BroadcastExchangeExec(
// SparkFatalException, which is a subclass of Exception. ThreadUtils.awaitResult
// will catch this exception and re-throw the wrapped fatal throwable.
case oe: OutOfMemoryError =>
+ val tables = child.collect { case f: FileSourceScanExec => f.tableIdentifier }.flatten
val ex = new SparkFatalException(
- QueryExecutionErrors.notEnoughMemoryToBuildAndBroadcastTableError(oe))
+ QueryExecutionErrors.notEnoughMemoryToBuildAndBroadcastTableError(oe, tables))
promise.tryFailure(ex)
throw ex
case e if !NonFatal(e) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index 814a2e472f..c056baba8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -3826,7 +3826,8 @@ object functions {
}
/**
- * Creates timestamp from the number of seconds since UTC epoch.
+ * Converts the number of seconds from the Unix epoch (1970-01-01T00:00:00Z)
+ * to a timestamp.
* @group datetime_funcs
* @since 3.1.0
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 978e3f8d36..321a838f27 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -672,6 +672,18 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
}
}
}
+
+ test("SPARK-35359: create table and insert data over length values") {
+ Seq("char", "varchar").foreach { typ =>
+ withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
+ withTable("t") {
+ sql(s"CREATE TABLE t (col $typ(2)) using $format")
+ sql("INSERT INTO t SELECT 'aaa'")
+ checkAnswer(sql("select * from t"), Row("aaa"))
+ }
+ }
+ }
+ }
}
// Some basic char/varchar tests which doesn't rely on table implementation.
@@ -799,7 +811,6 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa
withTable("t") {
sql("SELECT '12' as col").write.format(format).save(dir.toString)
sql(s"CREATE TABLE t (col $typ(2)) using $format LOCATION '$dir'")
- val df = sql("select * from t")
checkAnswer(sql("select * from t"), Row("12"))
}
}
@@ -818,18 +829,6 @@ class FileSourceCharVarcharTestSuite extends CharVarcharTestSuite with SharedSpa
}
}
- test("SPARK-35359: create table and insert data over length values") {
- Seq("char", "varchar").foreach { typ =>
- withSQLConf((SQLConf.LEGACY_CHAR_VARCHAR_AS_STRING.key, "true")) {
- withTable("t") {
- sql(s"CREATE TABLE t (col $typ(2)) using $format")
- sql("INSERT INTO t SELECT 'aaa'")
- checkAnswer(sql("select * from t"), Row("aaa"))
- }
- }
- }
- }
-
test("alter table set location w/ fit length values") {
Seq("char", "varchar").foreach { typ =>
withTempPath { dir =>