Skip to content

Commit

Permalink
Adds setLocalProperty and getLocalProperty
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Dec 26, 2017
1 parent 0e68330 commit 650a486
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 2 deletions.
4 changes: 3 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ exportMethods("glm",
export("setJobGroup",
"clearJobGroup",
"cancelJobGroup",
"setJobDescription")
"setJobDescription",
"setLocalProperty",
"getLocalProperty")

# Export Utility methods
export("setLogLevel")
Expand Down
45 changes: 45 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,54 @@ cancelJobGroup <- function(sc, groupId) {
#' @note setJobDescription since 2.3.0
setJobDescription <- function(value) {
sc <- getSparkContext()
if (!is.null(value)) {
value <- as.character(value)
}
invisible(callJMethod(sc, "setJobDescription", value))
}

#' Set a local property that affects jobs submitted from this thread, such as the
#' Spark fair scheduler pool.
#'
#' @param key The key for a local property.
#' @param value The value for a local property.
#' @rdname setLocalProperty
#' @name setLocalProperty
#' @examples
#'\dontrun{
#' setLocalProperty("spark.scheduler.pool", "poolA")
#'}
#' @note setLocalProperty since 2.3.0
setLocalProperty <- function(key, value) {
sc <- getSparkContext()
if (is.null(key)) {
stop("key should not be NULL.")
}
if (!is.null(value)) {
value <- as.character(value)
}
invisible(callJMethod(sc, "setLocalProperty", as.character(key), value))
}

#' Get a local property set in this thread, or \code{NULL} if it is missing. See
#' \code{setLocalProperty}.
#'
#' @param key The key for a local property.
#' @rdname getLocalProperty
#' @name getLocalProperty
#' @examples
#'\dontrun{
#' getLocalProperty("spark.scheduler.pool")
#'}
#' @note getLocalProperty since 2.3.0
getLocalProperty <- function(key) {
sc <- getSparkContext()
if (is.null(key)) {
stop("key should not be NULL.")
}
invisible(callJMethod(sc, "getLocalProperty", as.character(key)))
}

sparkConfToSubmitOps <- new.env()
sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory"
sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path"
Expand Down
25 changes: 24 additions & 1 deletion R/pkg/tests/fulltests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,37 @@ test_that("job group functions can be called", {
setJobGroup("groupId", "job description", TRUE)
cancelJobGroup("groupId")
clearJobGroup()
setJobDescription("job description")

suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE))
suppressWarnings(cancelJobGroup(sc, "groupId"))
suppressWarnings(clearJobGroup(sc))
sparkR.session.stop()
})

test_that("job description and local properties can be set and got", {
sc <- sparkR.sparkContext(master = sparkRTestMaster)
setJobDescription("job description")
expect_equal(getLocalProperty("spark.job.description"), "job description")
setJobDescription(1234)
expect_equal(getLocalProperty("spark.job.description"), "1234")
setJobDescription(NULL)
expect_equal(getLocalProperty("spark.job.description"), NULL)

setLocalProperty("spark.scheduler.pool", "poolA")
expect_equal(getLocalProperty("spark.scheduler.pool"), "poolA")
setLocalProperty("spark.scheduler.pool", NULL)
expect_equal(getLocalProperty("spark.scheduler.pool"), NULL)

setLocalProperty(4321, 1234)
expect_equal(getLocalProperty(4321), "1234")
setLocalProperty(4321, NULL)
expect_equal(getLocalProperty(4321), NULL)

expect_error(setLocalProperty(NULL, "should fail"), "key should not be NULL.")
expect_error(getLocalProperty(NULL), "key should not be NULL.")
sparkR.session.stop()
})

test_that("utility function can be called", {
sparkR.sparkContext(master = sparkRTestMaster)
setLogLevel("ERROR")
Expand Down

0 comments on commit 650a486

Please sign in to comment.