Skip to content

Commit

Permalink
Merge branch 'master' into optimize_explode
Browse files Browse the repository at this point in the history
  • Loading branch information
uzadude authored Dec 7, 2017
2 parents b825d6b + 8ae004b commit 04b5814
Show file tree
Hide file tree
Showing 572 changed files with 20,440 additions and 12,427 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
6 changes: 6 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
Apache Commons Pool
Copyright 1999-2009 The Apache Software Foundation

This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
Copyright (C) 2015 Red Hat, Inc.

This product includes/uses OkHttp (https://github.com/square/okhttp)
Copyright (C) 2012 The Android Open Source Project

=========================================================================
== NOTICE file corresponding to section 4(d) of the Apache License, ==
== Version 2.0, in this case for the DataNucleus distribution. ==
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ exportMethods("%<=>%",
"date_sub",
"datediff",
"dayofmonth",
"dayofweek",
"dayofyear",
"decode",
"dense_rank",
Expand Down
90 changes: 53 additions & 37 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,23 @@ setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
#' Set options/mode and then return the write object
#' @noRd
setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
jmode <- convertToJSaveMode(mode)
write <- callJMethod(write, "mode", jmode)
write <- callJMethod(write, "options", options)
write
options <- varargsToStrEnv(...)
if (!is.null(path)) {
options[["path"]] <- path
}
write <- setWriteMode(write, mode)
write <- callJMethod(write, "options", options)
write
}

#' Set mode and then return the write object
#' @noRd
setWriteMode <- function(write, mode) {
if (!is.character(mode)) {
stop("mode should be character or omitted. It is 'error' by default.")
}
write <- handledCallJMethod(write, "mode", mode)
write
}

#' @export
Expand Down Expand Up @@ -556,9 +565,8 @@ setMethod("registerTempTable",
setMethod("insertInto",
signature(x = "SparkDataFrame", tableName = "character"),
function(x, tableName, overwrite = FALSE) {
jmode <- convertToJSaveMode(ifelse(overwrite, "overwrite", "append"))
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
write <- setWriteMode(write, ifelse(overwrite, "overwrite", "append"))
invisible(callJMethod(write, "insertInto", tableName))
})

Expand Down Expand Up @@ -810,7 +818,8 @@ setMethod("toJSON",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand Down Expand Up @@ -841,7 +850,8 @@ setMethod("write.json",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand Down Expand Up @@ -872,7 +882,8 @@ setMethod("write.orc",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand Down Expand Up @@ -917,7 +928,8 @@ setMethod("saveAsParquetFile",
#'
#' @param x A SparkDataFrame
#' @param path The directory where the file is saved
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand Down Expand Up @@ -2871,18 +2883,19 @@ setMethod("except",
#' Additionally, mode is used to specify the behavior of the save operation when data already
#' exists in the data source. There are four modes:
#' \itemize{
#' \item append: Contents of this SparkDataFrame are expected to be appended to existing data.
#' \item overwrite: Existing data is expected to be overwritten by the contents of this
#' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
#' \item 'overwrite': Existing data is expected to be overwritten by the contents of this
#' SparkDataFrame.
#' \item error: An exception is expected to be thrown.
#' \item ignore: The save operation is expected to not save the contents of the SparkDataFrame
#' \item 'error' or 'errorifexists': An exception is expected to be thrown.
#' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
#' and to not change the existing data.
#' }
#'
#' @param df a SparkDataFrame.
#' @param path a name for the table.
#' @param source a name for external data source.
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default)
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand Down Expand Up @@ -2940,17 +2953,18 @@ setMethod("saveDF",
#'
#' Additionally, mode is used to specify the behavior of the save operation when
#' data already exists in the data source. There are four modes: \cr
#' append: Contents of this SparkDataFrame are expected to be appended to existing data. \cr
#' overwrite: Existing data is expected to be overwritten by the contents of this
#' 'append': Contents of this SparkDataFrame are expected to be appended to existing data. \cr
#' 'overwrite': Existing data is expected to be overwritten by the contents of this
#' SparkDataFrame. \cr
#' error: An exception is expected to be thrown. \cr
#' ignore: The save operation is expected to not save the contents of the SparkDataFrame
#' 'error' or 'errorifexists': An exception is expected to be thrown. \cr
#' 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
#' and to not change the existing data. \cr
#'
#' @param df a SparkDataFrame.
#' @param tableName a name for the table.
#' @param source a name for external data source.
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default).
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional option(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand All @@ -2972,12 +2986,11 @@ setMethod("saveAsTable",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
jmode <- convertToJSaveMode(mode)
options <- varargsToStrEnv(...)

write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
write <- callJMethod(write, "mode", jmode)
write <- setWriteMode(write, mode)
write <- callJMethod(write, "options", options)
invisible(callJMethod(write, "saveAsTable", tableName))
})
Expand Down Expand Up @@ -3236,7 +3249,7 @@ setMethod("as.data.frame",
#'
#' @family SparkDataFrame functions
#' @rdname attach
#' @aliases attach,SparkDataFrame-method
#' @aliases attach attach,SparkDataFrame-method
#' @param what (SparkDataFrame) The SparkDataFrame to attach
#' @param pos (integer) Specify position in search() where to attach.
#' @param name (character) Name to use for the attached SparkDataFrame. Names
Expand All @@ -3252,9 +3265,12 @@ setMethod("as.data.frame",
#' @note attach since 1.6.0
setMethod("attach",
signature(what = "SparkDataFrame"),
function(what, pos = 2, name = deparse(substitute(what)), warn.conflicts = TRUE) {
newEnv <- assignNewEnv(what)
attach(newEnv, pos = pos, name = name, warn.conflicts = warn.conflicts)
function(what, pos = 2L, name = deparse(substitute(what), backtick = FALSE),
warn.conflicts = TRUE) {
args <- as.list(environment()) # capture all parameters - this must be the first line
newEnv <- assignNewEnv(args$what)
args$what <- newEnv
do.call(attach, args)
})

#' Evaluate a R expression in an environment constructed from a SparkDataFrame
Expand Down Expand Up @@ -3541,18 +3557,19 @@ setMethod("histogram",
#' Also, mode is used to specify the behavior of the save operation when
#' data already exists in the data source. There are four modes:
#' \itemize{
#' \item append: Contents of this SparkDataFrame are expected to be appended to existing data.
#' \item overwrite: Existing data is expected to be overwritten by the contents of this
#' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
#' \item 'overwrite': Existing data is expected to be overwritten by the contents of this
#' SparkDataFrame.
#' \item error: An exception is expected to be thrown.
#' \item ignore: The save operation is expected to not save the contents of the SparkDataFrame
#' \item 'error' or 'errorifexists': An exception is expected to be thrown.
#' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
#' and to not change the existing data.
#' }
#'
#' @param x a SparkDataFrame.
#' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}.
#' @param tableName yhe name of the table in the external database.
#' @param mode one of 'append', 'overwrite', 'error', 'ignore' save mode (it is 'error' by default).
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param ... additional JDBC database connection properties.
#' @family SparkDataFrame functions
#' @rdname write.jdbc
Expand All @@ -3569,10 +3586,9 @@ setMethod("histogram",
setMethod("write.jdbc",
signature(x = "SparkDataFrame", url = "character", tableName = "character"),
function(x, url, tableName, mode = "error", ...) {
jmode <- convertToJSaveMode(mode)
jprops <- varargsToJProperties(...)
write <- callJMethod(x@sdf, "write")
write <- callJMethod(write, "mode", jmode)
write <- setWriteMode(write, mode)
invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
})

Expand Down
17 changes: 16 additions & 1 deletion R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ setMethod("hash",
#'
#' \dontrun{
#' head(select(df, df$time, year(df$time), quarter(df$time), month(df$time),
#' dayofmonth(df$time), dayofyear(df$time), weekofyear(df$time)))
#' dayofmonth(df$time), dayofweek(df$time), dayofyear(df$time), weekofyear(df$time)))
#' head(agg(groupBy(df, year(df$time)), count(df$y), avg(df$y)))
#' head(agg(groupBy(df, month(df$time)), avg(df$y)))}
#' @note dayofmonth since 1.5.0
Expand All @@ -707,6 +707,21 @@ setMethod("dayofmonth",
column(jc)
})

#' @details
#' \code{dayofweek}: Extracts the day of the week as an integer from a
#' given date/timestamp/string.
#'
#' @rdname column_datetime_functions
#' @aliases dayofweek dayofweek,Column-method
#' @export
#' @note dayofweek since 2.3.0
setMethod("dayofweek",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "dayofweek", x@jc)
column(jc)
})

#' @details
#' \code{dayofyear}: Extracts the day of the year as an integer from a
#' given date/timestamp/string.
Expand Down
15 changes: 9 additions & 6 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,8 @@ setGeneric("as.data.frame",
standardGeneric("as.data.frame")
})

#' @rdname attach
# Do not document the generic because of signature changes across R versions
#' @noRd
#' @export
setGeneric("attach")

Expand Down Expand Up @@ -1047,6 +1048,11 @@ setGeneric("date_sub", function(y, x) { standardGeneric("date_sub") })
#' @name NULL
setGeneric("dayofmonth", function(x) { standardGeneric("dayofmonth") })

#' @rdname column_datetime_functions
#' @export
#' @name NULL
setGeneric("dayofweek", function(x) { standardGeneric("dayofweek") })

#' @rdname column_datetime_functions
#' @export
#' @name NULL
Expand Down Expand Up @@ -1569,12 +1575,9 @@ setGeneric("year", function(x) { standardGeneric("year") })
#' @export
setGeneric("fitted")

#' @param x,y For \code{glm}: logical values indicating whether the response vector
#' and model matrix used in the fitting process should be returned as
#' components of the returned value.
#' @inheritParams stats::glm
#' @rdname glm
# Do not carry stats::glm usage and param here, and do not document the generic
#' @export
#' @noRd
setGeneric("glm")

#' @param object a fitted ML model object.
Expand Down
37 changes: 36 additions & 1 deletion R/pkg/R/install.R
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
})
if (!tarExists || overwrite || !success) {
unlink(packageLocalPath)
if (success) {
# if tar file was not there before (or it was, but we are told to overwrite it),
# and untar is successful - set a flag that we have downloaded (and untar) Spark package.
assign(".sparkDownloaded", TRUE, envir = .sparkREnv)
}
}
if (!success) stop("Extract archive failed.")
message("DONE.")
Expand Down Expand Up @@ -266,6 +271,7 @@ hadoopVersionName <- function(hadoopVersion) {

# The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
# adapt to Spark context
# see also sparkCacheRelPathLength()
sparkCachePath <- function() {
if (is_windows()) {
winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
Expand All @@ -282,7 +288,7 @@ sparkCachePath <- function() {
}
} else if (.Platform$OS.type == "unix") {
if (Sys.info()["sysname"] == "Darwin") {
path <- file.path(Sys.getenv("HOME"), "Library/Caches", "spark")
path <- file.path(Sys.getenv("HOME"), "Library", "Caches", "spark")
} else {
path <- file.path(
Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark")
Expand All @@ -293,6 +299,16 @@ sparkCachePath <- function() {
normalizePath(path, mustWork = FALSE)
}

# Length of the Spark cache specific relative path segments for each platform
# eg. "Apache\Spark\Cache" is 3 in Windows, or "spark" is 1 in unix
# Must match sparkCachePath() exactly.
sparkCacheRelPathLength <- function() {
if (is_windows()) {
3
} else {
1
}
}

installInstruction <- function(mode) {
if (mode == "remote") {
Expand All @@ -310,3 +326,22 @@ installInstruction <- function(mode) {
stop(paste0("No instruction found for ", mode, " mode."))
}
}

uninstallDownloadedSpark <- function() {
# clean up if Spark was downloaded
sparkDownloaded <- getOne(".sparkDownloaded",
envir = .sparkREnv,
inherits = TRUE,
ifnotfound = FALSE)
sparkDownloadedDir <- Sys.getenv("SPARK_HOME")
if (sparkDownloaded && nchar(sparkDownloadedDir) > 0) {
unlink(sparkDownloadedDir, recursive = TRUE, force = TRUE)

dirs <- traverseParentDirs(sparkCachePath(), sparkCacheRelPathLength())
lapply(dirs, function(d) {
if (length(list.files(d, all.files = TRUE, include.dirs = TRUE, no.. = TRUE)) == 0) {
unlink(d, recursive = TRUE, force = TRUE)
}
})
}
}
1 change: 1 addition & 0 deletions R/pkg/R/mllib_regression.R
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
#' 1.0.
#' @return \code{glm} returns a fitted generalized linear model.
#' @rdname glm
#' @aliases glm
#' @export
#' @examples
#' \dontrun{
Expand Down
Loading

0 comments on commit 04b5814

Please sign in to comment.