Skip to content

Commit

Permalink
[SPARK-14557][SQL]: Resolving merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
kasjain committed Apr 22, 2016
2 parents 6bd529c + 8012793 commit ca9a160
Show file tree
Hide file tree
Showing 1,357 changed files with 24,686 additions and 15,033 deletions.
9 changes: 8 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ export("setJobGroup",
"clearJobGroup",
"cancelJobGroup")

# Export Utility methods
export("setLogLevel")

exportClasses("DataFrame")

exportMethods("arrange",
Expand Down Expand Up @@ -101,6 +104,7 @@ exportMethods("arrange",
"withColumn",
"withColumnRenamed",
"write.df",
"write.jdbc",
"write.json",
"write.parquet",
"write.text")
Expand All @@ -125,6 +129,7 @@ exportMethods("%in%",
"between",
"bin",
"bitwiseNOT",
"bround",
"cast",
"cbrt",
"ceil",
Expand Down Expand Up @@ -284,6 +289,7 @@ export("as.DataFrame",
"loadDF",
"parquetFile",
"read.df",
"read.jdbc",
"read.json",
"read.parquet",
"read.text",
Expand All @@ -292,7 +298,8 @@ export("as.DataFrame",
"tableToDF",
"tableNames",
"tables",
"uncacheTable")
"uncacheTable",
"print.summary.GeneralizedLinearRegressionModel")

export("structField",
"structField.jobj",
Expand Down
47 changes: 40 additions & 7 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2296,12 +2296,8 @@ setMethod("fillna",
#' }
setMethod("as.data.frame",
signature(x = "DataFrame"),
function(x, ...) {
# Check if additional parameters have been passed
if (length(list(...)) > 0) {
stop(paste("Unused argument(s): ", paste(list(...), collapse = ", ")))
}
collect(x)
function(x, row.names = NULL, optional = FALSE, ...) {
as.data.frame(collect(x), row.names, optional, ...)
})

#' The specified DataFrame is attached to the R search path. This means that
Expand Down Expand Up @@ -2363,7 +2359,7 @@ setMethod("with",
#' @examples \dontrun{
#' # Create a DataFrame from the Iris dataset
#' irisDF <- createDataFrame(sqlContext, iris)
#'
#'
#' # Show the structure of the DataFrame
#' str(irisDF)
#' }
Expand Down Expand Up @@ -2468,3 +2464,40 @@ setMethod("drop",
function(x) {
base::drop(x)
})

#' Saves the content of the DataFrame to an external database table via JDBC
#'
#' Additional JDBC database connection properties can be set (...)
#'
#' Also, mode is used to specify the behavior of the save operation when
#' data already exists in the data source. There are four modes: \cr
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
#' error: An exception is expected to be thrown. \cr
#' ignore: The save operation is expected to not save the contents of the DataFrame
#' and to not change the existing data. \cr
#'
#' @param x A SparkSQL DataFrame
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
#' @param tableName The name of the table in the external database
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @family DataFrame functions
#' @rdname write.jdbc
#' @name write.jdbc
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
#' write.jdbc(df, jdbcUrl, "table", user = "username", password = "password")
#' }
setMethod("write.jdbc",
signature(x = "DataFrame", url = "character", tableName = "character"),
function(x, url, tableName, mode = "error", ...){
jmode <- convertToJSaveMode(mode)
jprops <- varargsToJProperties(...)
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
})
58 changes: 58 additions & 0 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -583,3 +583,61 @@ createExternalTable <- function(sqlContext, tableName, path = NULL, source = NUL
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
dataFrame(sdf)
}

#' Create a DataFrame representing the database table accessible via JDBC URL
#'
#' Additional JDBC database connection properties can be set (...)
#'
#' Only one of partitionColumn or predicates should be set. Partitions of the table will be
#' retrieved in parallel based on the `numPartitions` or by the predicates.
#'
#' Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
#' your external database systems.
#'
#' @param sqlContext SQLContext to use
#' @param url JDBC database url of the form `jdbc:subprotocol:subname`
#' @param tableName the name of the table in the external database
#' @param partitionColumn the name of a column of integral type that will be used for partitioning
#' @param lowerBound the minimum value of `partitionColumn` used to decide partition stride
#' @param upperBound the maximum value of `partitionColumn` used to decide partition stride
#' @param numPartitions the number of partitions, This, along with `lowerBound` (inclusive),
#' `upperBound` (exclusive), form partition strides for generated WHERE
#' clause expressions used to split the column `partitionColumn` evenly.
#' This defaults to SparkContext.defaultParallelism when unset.
#' @param predicates a list of conditions in the where clause; each one defines one partition
#' @return DataFrame
#' @rdname read.jdbc
#' @name read.jdbc
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
#' df <- read.jdbc(sqlContext, jdbcUrl, "table", predicates = list("field<=123"), user = "username")
#' df2 <- read.jdbc(sqlContext, jdbcUrl, "table2", partitionColumn = "index", lowerBound = 0,
#' upperBound = 10000, user = "username", password = "password")
#' }

read.jdbc <- function(sqlContext, url, tableName,
partitionColumn = NULL, lowerBound = NULL, upperBound = NULL,
numPartitions = 0L, predicates = list(), ...) {
jprops <- varargsToJProperties(...)

read <- callJMethod(sqlContext, "read")
if (!is.null(partitionColumn)) {
if (is.null(numPartitions) || numPartitions == 0) {
sc <- callJMethod(sqlContext, "sparkContext")
numPartitions <- callJMethod(sc, "defaultParallelism")
} else {
numPartitions <- numToInt(numPartitions)
}
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
} else if (length(predicates) > 0) {
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
} else {
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
}
dataFrame(sdf)
}
17 changes: 17 additions & 0 deletions R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -225,3 +225,20 @@ broadcast <- function(sc, object) {
setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}

#' Set new log level
#'
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
#'
#' @rdname setLogLevel
#' @param sc Spark Context to use
#' @param level New log level
#' @export
#' @examples
#'\dontrun{
#' setLogLevel(sc, "ERROR")
#'}

setLogLevel <- function(sc, level) {
callJMethod(sc, "setLogLevel", level)
}
22 changes: 21 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -994,7 +994,7 @@ setMethod("rint",

#' round
#'
#' Returns the value of the column `e` rounded to 0 decimal places.
#' Returns the value of the column `e` rounded to 0 decimal places using HALF_UP rounding mode.
#'
#' @rdname round
#' @name round
Expand All @@ -1008,6 +1008,26 @@ setMethod("round",
column(jc)
})

#' bround
#'
#' Returns the value of the column `e` rounded to `scale` decimal places using HALF_EVEN rounding
#' mode if `scale` >= 0 or at integral part when `scale` < 0.
#' Also known as Gaussian rounding or bankers' rounding that rounds to the nearest even number.
#' bround(2.5, 0) = 2, bround(3.5, 0) = 4.
#'
#' @rdname bround
#' @name bround
#' @family math_funcs
#' @export
#' @examples \dontrun{bround(df$c, 0)}
setMethod("bround",
signature(x = "Column"),
function(x, scale = 0) {
jc <- callJStatic("org.apache.spark.sql.functions", "bround", x@jc, as.integer(scale))
column(jc)
})


#' rtrim
#'
#' Trim the spaces from right end for the specified string value.
Expand Down
15 changes: 14 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,10 @@ setGeneric("arrange", function(x, col, ...) { standardGeneric("arrange") })

#' @rdname as.data.frame
#' @export
setGeneric("as.data.frame")
setGeneric("as.data.frame",
function(x, row.names = NULL, optional = FALSE, ...) {
standardGeneric("as.data.frame")
})

#' @rdname attach
#' @export
Expand Down Expand Up @@ -577,6 +580,12 @@ setGeneric("saveDF", function(df, path, source = NULL, mode = "error", ...) {
standardGeneric("saveDF")
})

#' @rdname write.jdbc
#' @export
setGeneric("write.jdbc", function(x, url, tableName, mode = "error", ...) {
standardGeneric("write.jdbc")
})

#' @rdname write.json
#' @export
setGeneric("write.json", function(x, path) { standardGeneric("write.json") })
Expand Down Expand Up @@ -751,6 +760,10 @@ setGeneric("bin", function(x) { standardGeneric("bin") })
#' @export
setGeneric("bitwiseNOT", function(x) { standardGeneric("bitwiseNOT") })

#' @rdname bround
#' @export
setGeneric("bround", function(x, ...) { standardGeneric("bround") })

#' @rdname cbrt
#' @export
setGeneric("cbrt", function(x) { standardGeneric("cbrt") })
Expand Down
Loading

0 comments on commit ca9a160

Please sign in to comment.