Skip to content

Commit

Permalink
[SPARK-39667][SQL] Add another workaround when there is not enough me…
Browse files Browse the repository at this point in the history
…mory to build and broadcast the table

### What changes were proposed in this pull request?

This PR adds `ANALYZE TABLE tbl COMPUTE STATISTICS;` as a workaround when there is not enough memory to build and broadcast the table.

### Why are the changes needed?

The current workaround has the following disadvantages:
1. Setting `spark.sql.autoBroadcastJoinThreshold` to -1 will disable all broadcast joins.
2. We don't know the specific value of `spark.driver.memory` should be set.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual testing:
```
Not enough memory to build and broadcast the table to all worker nodes. As a workaround, you can either disable broadcast by setting spark.sql.autoBroadcastJoinThreshold to -1 or increase the spark driver memory by setting spark.driver.memory to a higher value or analyze these tables through: ANALYZE TABLE `tpds5t`.`item` COMPUTE STATISTICS;."
```

Closes #37069 from wangyum/SPARK-39667.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
  • Loading branch information
a0x8o committed Jul 11, 2022
1 parent 2dccd73 commit 4fa25d1
Show file tree
Hide file tree
Showing 28 changed files with 367 additions and 76 deletions.
4 changes: 4 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,11 @@ export("as.DataFrame",
"createTable",
"currentCatalog",
"currentDatabase",
"databaseExists",
"dropTempTable",
"dropTempView",
"getDatabase",
"getTable",
"listCatalogs",
"listColumns",
"listDatabases",
Expand All @@ -503,6 +506,7 @@ export("as.DataFrame",
"spark.getSparkFiles",
"sql",
"str",
"tableExists",
"tableToDF",
"tableNames",
"tables",
Expand Down
159 changes: 152 additions & 7 deletions R/pkg/R/catalog.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema =
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' The table name can be fully qualified with catalog name since 3.4.0.
#' @param path (optional) the path of files to load.
#' @param source (optional) the name of the data source.
#' @param schema (optional) the schema of the data required for some data sources.
Expand All @@ -129,7 +130,7 @@ createExternalTable <- function(tableName, path = NULL, source = NULL, schema =
#' sparkR.session()
#' df <- createTable("myjson", path="path/to/json", source="json", schema)
#'
#' createTable("people", source = "json", schema = schema)
#' createTable("spark_catalog.default.people", source = "json", schema = schema)
#' insertInto(df, "people")
#' }
#' @name createTable
Expand Down Expand Up @@ -160,6 +161,7 @@ createTable <- function(tableName, path = NULL, source = NULL, schema = NULL, ..
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' The table name can be fully qualified with catalog name since 3.4.0.
#' @return SparkDataFrame
#' @rdname cacheTable
#' @examples
Expand All @@ -184,6 +186,7 @@ cacheTable <- function(tableName) {
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' The table name can be fully qualified with catalog name since 3.4.0.
#' @return SparkDataFrame
#' @rdname uncacheTable
#' @examples
Expand Down Expand Up @@ -275,13 +278,14 @@ dropTempView <- function(viewName) {
#' Returns a SparkDataFrame containing names of tables in the given database.
#'
#' @param databaseName (optional) name of the database
#' The database name can be qualified with catalog name since 3.4.0.
#' @return a SparkDataFrame
#' @rdname tables
#' @seealso \link{listTables}
#' @examples
#'\dontrun{
#' sparkR.session()
#' tables("hive")
#' tables("spark_catalog.hive")
#' }
#' @name tables
#' @note tables since 1.4.0
Expand All @@ -295,12 +299,13 @@ tables <- function(databaseName = NULL) {
#' Returns the names of tables in the given database as an array.
#'
#' @param databaseName (optional) name of the database
#' The database name can be qualified with catalog name since 3.4.0.
#' @return a list of table names
#' @rdname tableNames
#' @examples
#'\dontrun{
#' sparkR.session()
#' tableNames("hive")
#' tableNames("spark_catalog.hive")
#' }
#' @name tableNames
#' @note tableNames since 1.4.0
Expand Down Expand Up @@ -353,6 +358,28 @@ setCurrentDatabase <- function(databaseName) {
invisible(handledCallJMethod(catalog, "setCurrentDatabase", databaseName))
}

#' Checks if the database with the specified name exists.
#'
#' Checks if the database with the specified name exists.
#'
#' @param databaseName name of the database, allowed to be qualified with catalog name
#' @rdname databaseExists
#' @name databaseExists
#' @examples
#' \dontrun{
#' sparkR.session()
#' databaseExists("spark_catalog.default")
#' }
#' @note since 3.4.0
databaseExists <- function(databaseName) {
sparkSession <- getSparkSession()
if (class(databaseName) != "character") {
stop("databaseName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "databaseExists", databaseName)
}

#' Returns a list of databases available
#'
#' Returns a list of databases available.
Expand All @@ -372,12 +399,54 @@ listDatabases <- function() {
dataFrame(callJMethod(callJMethod(catalog, "listDatabases"), "toDF"))
}

#' Get the database with the specified name
#'
#' Get the database with the specified name
#'
#' @param databaseName name of the database, allowed to be qualified with catalog name
#' @return A named list.
#' @rdname getDatabase
#' @name getDatabase
#' @examples
#' \dontrun{
#' sparkR.session()
#' db <- getDatabase("default")
#' }
#' @note since 3.4.0
getDatabase <- function(databaseName) {
sparkSession <- getSparkSession()
if (class(databaseName) != "character") {
stop("databaseName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
jdb <- handledCallJMethod(catalog, "getDatabase", databaseName)

ret <- list(name = callJMethod(jdb, "name"))
jcata <- callJMethod(jdb, "catalog")
if (is.null(jcata)) {
ret$catalog <- NA
} else {
ret$catalog <- jcata
}

jdesc <- callJMethod(jdb, "description")
if (is.null(jdesc)) {
ret$description <- NA
} else {
ret$description <- jdesc
}

ret$locationUri <- callJMethod(jdb, "locationUri")
ret
}

#' Returns a list of tables or views in the specified database
#'
#' Returns a list of tables or views in the specified database.
#' This includes all temporary views.
#'
#' @param databaseName (optional) name of the database
#' The database name can be qualified with catalog name since 3.4.0.
#' @return a SparkDataFrame of the list of tables.
#' @rdname listTables
#' @name listTables
Expand All @@ -386,7 +455,7 @@ listDatabases <- function() {
#' \dontrun{
#' sparkR.session()
#' listTables()
#' listTables("default")
#' listTables("spark_catalog.default")
#' }
#' @note since 2.2.0
listTables <- function(databaseName = NULL) {
Expand All @@ -403,21 +472,95 @@ listTables <- function(databaseName = NULL) {
dataFrame(callJMethod(jdst, "toDF"))
}

#' Checks if the table with the specified name exists.
#'
#' Checks if the table with the specified name exists.
#'
#' @param tableName name of the table, allowed to be qualified with catalog name
#' @rdname tableExists
#' @name tableExists
#' @examples
#' \dontrun{
#' sparkR.session()
#' databaseExists("spark_catalog.default.myTable")
#' }
#' @note since 3.4.0
tableExists <- function(tableName) {
sparkSession <- getSparkSession()
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
callJMethod(catalog, "tableExists", tableName)
}

#' Get the table with the specified name
#'
#' Get the table with the specified name
#'
#' @param tableName the qualified or unqualified name that designates a table, allowed to be
#' qualified with catalog name
#' @return A named list.
#' @rdname getTable
#' @name getTable
#' @examples
#' \dontrun{
#' sparkR.session()
#' tbl <- getTable("spark_catalog.default.myTable")
#' }
#' @note since 3.4.0
getTable <- function(tableName) {
sparkSession <- getSparkSession()
if (class(tableName) != "character") {
stop("tableName must be a string.")
}
catalog <- callJMethod(sparkSession, "catalog")
jtbl <- handledCallJMethod(catalog, "getTable", tableName)

ret <- list(name = callJMethod(jtbl, "name"))
jcata <- callJMethod(jtbl, "catalog")
if (is.null(jcata)) {
ret$catalog <- NA
} else {
ret$catalog <- jcata
}

jns <- callJMethod(jtbl, "namespace")
if (is.null(jns)) {
ret$namespace <- NA
} else {
ret$namespace <- jns
}

jdesc <- callJMethod(jtbl, "description")
if (is.null(jdesc)) {
ret$description <- NA
} else {
ret$description <- jdesc
}

ret$tableType <- callJMethod(jtbl, "tableType")
ret$isTemporary <- callJMethod(jtbl, "isTemporary")
ret
}

#' Returns a list of columns for the given table/view in the specified database
#'
#' Returns a list of columns for the given table/view in the specified database.
#'
#' @param tableName the qualified or unqualified name that designates a table/view. If no database
#' identifier is provided, it refers to a table/view in the current database.
#' If \code{databaseName} parameter is specified, this must be an unqualified name.
#' The table name can be qualified with catalog name since 3.4.0, when databaseName
#' is NULL.
#' @param databaseName (optional) name of the database
#' @return a SparkDataFrame of the list of column descriptions.
#' @rdname listColumns
#' @name listColumns
#' @examples
#' \dontrun{
#' sparkR.session()
#' listColumns("mytable")
#' listColumns("spark_catalog.default.mytable")
#' }
#' @note since 2.2.0
listColumns <- function(tableName, databaseName = NULL) {
Expand Down Expand Up @@ -470,12 +613,13 @@ listFunctions <- function(databaseName = NULL) {
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' The table name can be fully qualified with catalog name since 3.4.0.
#' @rdname recoverPartitions
#' @name recoverPartitions
#' @examples
#' \dontrun{
#' sparkR.session()
#' recoverPartitions("myTable")
#' recoverPartitions("spark_catalog.default.myTable")
#' }
#' @note since 2.2.0
recoverPartitions <- function(tableName) {
Expand All @@ -496,12 +640,13 @@ recoverPartitions <- function(tableName) {
#'
#' @param tableName the qualified or unqualified name that designates a table. If no database
#' identifier is provided, it refers to a table in the current database.
#' The table name can be fully qualified with catalog name since 3.4.0.
#' @rdname refreshTable
#' @name refreshTable
#' @examples
#' \dontrun{
#' sparkR.session()
#' refreshTable("myTable")
#' refreshTable("spark_catalog.default.myTable")
#' }
#' @note since 2.2.0
refreshTable <- function(tableName) {
Expand Down
6 changes: 4 additions & 2 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,8 @@ setMethod("format_string", signature(format = "character", x = "Column"),
#' tmp <- mutate(df, to_unix = unix_timestamp(df$time),
#' to_unix2 = unix_timestamp(df$time, 'yyyy-MM-dd HH'),
#' from_unix = from_unixtime(unix_timestamp(df$time)),
#' from_unix2 = from_unixtime(unix_timestamp(df$time), 'yyyy-MM-dd HH:mm'))
#' from_unix2 = from_unixtime(unix_timestamp(df$time), 'yyyy-MM-dd HH:mm'),
#' timestamp_from_unix = timestamp_seconds(unix_timestamp(df$time)))
#' head(tmp)}
#' @note from_unixtime since 1.5.0
setMethod("from_unixtime", signature(x = "Column"),
Expand Down Expand Up @@ -4854,7 +4855,8 @@ setMethod("current_timestamp",
})

#' @details
#' \code{timestamp_seconds}: Creates timestamp from the number of seconds since UTC epoch.
#' \code{timestamp_seconds}: Converts the number of seconds from the Unix epoch
#' (1970-01-01T00:00:00Z) to a timestamp.
#'
#' @rdname column_datetime_functions
#' @aliases timestamp_seconds timestamp_seconds,Column-method
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/pkgdown/_pkgdown_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,11 @@ reference:
- contents:
- currentCatalog
- currentDatabase
- databaseExists
- dropTempTable
- dropTempView
- getDatabase
- getTable
- listCatalogs
- listColumns
- listDatabases
Expand All @@ -275,6 +278,7 @@ reference:
- recoverPartitions
- setCurrentCatalog
- setCurrentDatabase
- tableExists
- tableNames
- tables
- uncacheTable
Expand Down
Loading

0 comments on commit 4fa25d1

Please sign in to comment.