Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy #20129

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"withWatermark",
"write.df",
"write.jdbc",
"write.json",
Expand Down
96 changes: 92 additions & 4 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
#' isStreaming
#'
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
#' as it arrives.
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
#' \code{StreamingQuery} using \code{write.stream}.
#'
#' @param x A SparkDataFrame
#' @return TRUE if this SparkDataFrame is from a streaming source
Expand Down Expand Up @@ -3707,7 +3708,17 @@ setMethod("isStreaming",
#' @param df a streaming SparkDataFrame.
#' @param source a name for external data source.
#' @param outputMode one of 'append', 'complete', 'update'.
#' @param ... additional argument(s) passed to the method.
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
#' system. If specified, the output is laid out on the file system similar to Hive's
#' partitioning scheme.
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
#' default. Only one trigger can be set.
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
#' set.
#' @param ... additional external data source specific named options.
#'
#' @family SparkDataFrame functions
#' @seealso \link{read.stream}
Expand All @@ -3725,7 +3736,8 @@ setMethod("isStreaming",
#' # console
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
#' # text stream
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
#' # memory stream
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
#' head(sql("SELECT * from outs"))
Expand All @@ -3737,7 +3749,8 @@ setMethod("isStreaming",
#' @note experimental
setMethod("write.stream",
signature(df = "SparkDataFrame"),
function(df, source = NULL, outputMode = NULL, ...) {
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
trigger.processingTime = NULL, trigger.once = NULL, ...) {
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the data source specified ",
"in 'spark.sql.sources.default' configuration by default.")
Expand All @@ -3748,12 +3761,43 @@ setMethod("write.stream",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
cols <- NULL
if (!is.null(partitionBy)) {
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
stop("All partitionBy column names should be characters.")
}
cols <- as.list(partitionBy)
}
jtrigger <- NULL
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
if (!is.null(trigger.once)) {
stop("Multiple triggers not allowed.")
}
interval <- as.character(trigger.processingTime)
if (nchar(interval) == 0) {
stop("Value for trigger.processingTime must be a non-empty string.")
}
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
"ProcessingTime",
interval)
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
if (!is.logical(trigger.once) || !trigger.once) {
stop("Value for trigger.once must be TRUE.")
}
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
}
options <- varargsToStrEnv(...)
write <- handledCallJMethod(df@sdf, "writeStream")
write <- callJMethod(write, "format", source)
if (!is.null(outputMode)) {
write <- callJMethod(write, "outputMode", outputMode)
}
if (!is.null(cols)) {
write <- callJMethod(write, "partitionBy", cols)
}
if (!is.null(jtrigger)) {
write <- callJMethod(write, "trigger", jtrigger)
}
write <- callJMethod(write, "options", options)
ssq <- handledCallJMethod(write, "start")
streamingQuery(ssq)
Expand Down Expand Up @@ -3967,3 +4011,47 @@ setMethod("broadcast",
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
dataFrame(sdf)
})

#' withWatermark
#'
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
#' time before which we assume no more late data is going to arrive.
#'
#' Spark will use this watermark for several purposes:
#' \itemize{
#' \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
#' when using output modes that do not allow updates.
#' \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
#' }
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
#' process records that arrive more than \code{delayThreshold} late.
#'
#' @param x a streaming SparkDataFrame
#' @param eventTime a string specifying the name of the Column that contains the event time of the
#' row.
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
#' relative to the latest record that has been processed in the form of an
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
#' @return a SparkDataFrame.
#' @aliases withWatermark,SparkDataFrame,character,character-method
#' @family SparkDataFrame functions
#' @rdname withWatermark
#' @name withWatermark
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
#' df <- withWatermark(df, "time", "10 minutes")
#' }
#' @note withWatermark since 2.3.0
setMethod("withWatermark",
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
function(x, eventTime, delayThreshold) {
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
dataFrame(sdf)
})
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
#' required for file-based streaming data source
#' @param ... additional external data source specific named options, for instance \code{path} for
#' file-based streaming data source
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
#' uses the default value, session local timezone.
#' @return SparkDataFrame
#' @rdname read.stream
#' @name read.stream
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })

#' @rdname withWatermark
#' @export
setGeneric("withWatermark", function(x, eventTime, delayThreshold) {
standardGeneric("withWatermark")
})

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })
Expand Down
107 changes: 107 additions & 0 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,113 @@ test_that("Terminated by error", {
stopQuery(q)
})

test_that("PartitionBy", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
checkpointPath <- tempfile(pattern = "sparkr-test", fileext = ".checkpoint")
textPath <- tempfile(pattern = "sparkr-test", fileext = ".text")
df <- read.df(jsonPath, "json", stringSchema)
write.df(df, parquetPath, "parquet", "overwrite")

df <- read.stream(path = parquetPath, schema = stringSchema)

expect_error(write.stream(df, "json", path = textPath, checkpointLocation = "append",
partitionBy = c(1, 2)),
"All partitionBy column names should be characters")

q <- write.stream(df, "json", path = textPath, checkpointLocation = "append",
partitionBy = "name")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

dirs <- list.files(textPath)
expect_equal(length(dirs[substring(dirs, 1, nchar("name=")) == "name="]), 3)

unlink(checkpointPath)
unlink(textPath)
unlink(parquetPath)
})

test_that("Watermark", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
schema <- structType(structField("value", "string"))
t <- Sys.time()
df <- as.DataFrame(lapply(list(t), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
df <- read.stream(path = parquetPath, schema = "value STRING")
df <- withColumn(df, "eventTime", cast(df$value, "timestamp"))
df <- withWatermark(df, "eventTime", "10 seconds")
counts <- count(group_by(df, "eventTime"))
q <- write.stream(counts, "memory", queryName = "times", outputMode = "append")

# first events
df <- as.DataFrame(lapply(list(t + 1, t, t + 2), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# advance watermark to 15
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# old events, should be dropped
df <- as.DataFrame(lapply(list(t), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# evict events less than previous watermark
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

times <- collect(sql("SELECT * FROM times"))
# looks like write timing can affect the first bucket; but it should be t
expect_equal(times[order(times$eventTime),][1, 2], 2)

stopQuery(q)
unlink(parquetPath)
})

test_that("Trigger", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
schema <- structType(structField("value", "string"))
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
df <- read.stream(path = parquetPath, schema = "value STRING")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = "", trigger.once = ""), "Multiple triggers not allowed.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = ""),
"Value for trigger.processingTime must be a non-empty string.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = "invalid"), "illegal argument")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.once = ""), "Value for trigger.once must be TRUE.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.once = FALSE), "Value for trigger.once must be TRUE.")

q <- write.stream(df, "memory", queryName = "times", outputMode = "append", trigger.once = TRUE)
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

expect_equal(nrow(collect(sql("SELECT * FROM times"))), 1)

stopQuery(q)
unlink(parquetPath)
})

unlink(jsonPath)
unlink(jsonPathNa)

Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,10 @@ def trigger(self, processingTime=None, once=None):
.. note:: Evolving.

:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
Set a trigger that runs a query periodically based on the processing
time. Only one trigger can be set.
:param once: if set to True, set a trigger that processes only one batch of data in a
streaming query then terminates the query. Only one trigger can be set.

>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.streaming.Trigger

/**
* A [[Trigger]] that process only one batch of data in a streaming query then terminates
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
* the query.
*/
@Experimental
Expand Down