Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch 2.3 #20185

Closed
wants to merge 142 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
142 commits
Select commit Hold shift + click to select a range
5244aaf
[SPARK-22897][CORE] Expose stageAttemptId in TaskContext
advancedxy Jan 2, 2018
b96a213
[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.
juliuszsompolski Jan 3, 2018
a05e85e
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
gatorsmile Jan 3, 2018
b962488
[SPARK-20236][SQL] dynamic partition overwrite
cloud-fan Jan 3, 2018
27c949d
[SPARK-22932][SQL] Refactor AnalysisContext
gatorsmile Jan 2, 2018
79f7263
[SPARK-22896] Improvement in String interpolation
chetkhatri Jan 3, 2018
a51212b
[SPARK-20960][SQL] make ColumnVector public
cloud-fan Jan 3, 2018
f51c8fd
[SPARK-22944][SQL] improve FoldablePropagation
cloud-fan Jan 4, 2018
1860a43
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, t…
felixcheung Jan 4, 2018
a7cfd6b
[SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
yaooqinn Jan 4, 2018
eb99b8a
[SPARK-22945][SQL] add java UDF APIs in the functions object
cloud-fan Jan 4, 2018
1f5e354
[SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
gatorsmile Jan 4, 2018
bcfeef5
[SPARK-22771][SQL] Add a missing return statement in Concat.checkInpu…
maropu Jan 4, 2018
cd92913
[SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for ex…
jerryshao Jan 4, 2018
bc4bef4
[SPARK-22850][CORE] Ensure queued events are delivered to all event q…
Jan 4, 2018
2ab4012
[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Jan 4, 2018
84707f0
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-…
liyinan926 Jan 4, 2018
ea9da61
[SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Jan 5, 2018
158f7e6
[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
juliuszsompolski Jan 5, 2018
145820b
[SPARK-22825][SQL] Fix incorrect results of Casting Array to String
maropu Jan 5, 2018
5b524cc
[SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed…
MrBago Jan 5, 2018
f9dcdbc
[SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
liyinan926 Jan 5, 2018
fd4e304
[SPARK-22961][REGRESSION] Constant columns should generate QueryPlanC…
adrian-ionescu Jan 5, 2018
0a30e93
[SPARK-22940][SQL] HiveExternalCatalogVersionsSuite should succeed on…
bersprockets Jan 5, 2018
d1f422c
[SPARK-13030][ML] Follow-up cleanups for OneHotEncoderEstimator
jkbradley Jan 5, 2018
55afac4
[SPARK-22914][DEPLOY] Register history.ui.port
gerashegalov Jan 6, 2018
bf85301
[SPARK-22937][SQL] SQL elt output binary for binary inputs
maropu Jan 6, 2018
3e3e938
[SPARK-22960][K8S] Revert use of ARG base_image in images
liyinan926 Jan 6, 2018
7236914
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs…
icexelloss Jan 6, 2018
e6449e8
[SPARK-22793][SQL] Memory leak in Spark Thrift Server
Jan 6, 2018
0377755
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Jan 6, 2018
b66700a
[SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic f…
HyukjinKwon Jan 6, 2018
f9e7b0c
[HOTFIX] Fix style checking failure
gatorsmile Jan 6, 2018
285d342
[SPARK-22973][SQL] Fix incorrect results of Casting Map to String
maropu Jan 7, 2018
7673e9c
[SPARK-22985] Fix argument escaping bug in from_utc_timestamp / to_ut…
JoshRosen Jan 8, 2018
a1d3352
[SPARK-22566][PYTHON] Better error message for `_merge_type` in Panda…
gberger-palantir Jan 8, 2018
8bf24e9
[SPARK-22979][PYTHON][SQL] Avoid per-record type dispatch in Python d…
HyukjinKwon Jan 8, 2018
6964dfe
[SPARK-22983] Don't push filters beneath aggregates with empty groupi…
JoshRosen Jan 8, 2018
4a45f0a
[SPARK-21865][SQL] simplify the distribution semantic of Spark SQL
cloud-fan Jan 8, 2018
06fd842
[SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemp…
advancedxy Jan 8, 2018
eecd83c
[SPARK-22992][K8S] Remove assumption of the DNS domain
foxish Jan 8, 2018
8032cf8
[SPARK-22972] Couldn't find corresponding Hive SerDe for data source …
xubo245 Jan 9, 2018
850b9f3
[SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab
gengliangwang Jan 9, 2018
fd46a27
[SPARK-22984] Fix incorrect bitmap copying and offset adjustment in G…
JoshRosen Jan 9, 2018
911a4db
[SPARK-21292][DOCS] refreshtable example
felixcheung Jan 9, 2018
a23c07e
[SPARK-21293][SPARKR][DOCS] structured streaming doc update
felixcheung Jan 9, 2018
e79480e
[SPARK-23000] Fix Flaky test suite DataSourceWithHiveMetastoreCatalog…
gatorsmile Jan 9, 2018
47f975b
[SPARK-22998][K8S] Set missing value for SPARK_MOUNTED_CLASSPATH in t…
liyinan926 Jan 9, 2018
60f6b99
[SPARK-16060][SQL] Support Vectorized ORC Reader
dongjoon-hyun Jan 9, 2018
be59919
[SPARK-22981][SQL] Fix incorrect results of Casting Struct to String
maropu Jan 9, 2018
44763d9
[SPARK-22912] v2 data source support in MicroBatchExecution
jose-torres Jan 8, 2018
df047bd
[SPARK-23005][CORE] Improve RDD.take on small number of partitions
gengliangwang Jan 10, 2018
45f5c3c
[MINOR] fix a typo in BroadcastJoinSuite
cloud-fan Jan 10, 2018
20a8c88
[SPARK-23018][PYTHON] Fix createDataFrame from Pandas timestamp serie…
BryanCutler Jan 10, 2018
162c5be
[SPARK-22982] Remove unsafe asynchronous close() call from FileDownlo…
JoshRosen Jan 10, 2018
ecc24ec
[SPARK-16060][SQL][FOLLOW-UP] add a wrapper solution for vectorized o…
cloud-fan Jan 10, 2018
2db5239
[SPARK-22993][ML] Clarify HasCheckpointInterval param doc
sethah Jan 10, 2018
60d4d79
[SPARK-22997] Add additional defenses against use of freed MemoryBlocks
JoshRosen Jan 10, 2018
5b5851c
[SPARK-23019][CORE] Wait until SparkContext.stop() finished in SparkL…
gengliangwang Jan 10, 2018
eb4fa55
[SPARK-22951][SQL] fix aggregation after dropDuplicates on empty data…
Jan 10, 2018
551ccfb
[SPARK-23009][PYTHON] Fix for non-str col names to createDataFrame fr…
BryanCutler Jan 10, 2018
317b0aa
[SPARK-22587] Spark job fails if fs.defaultFS and application jar are…
Jan 11, 2018
d9a973d
[SPARK-23001][SQL] Fix NullPointerException when DESC a database with…
gatorsmile Jan 11, 2018
b781301
[SPARK-20657][CORE] Speed up rendering of the stages page.
Jan 11, 2018
7995989
[SPARK-22967][TESTS] Fix VersionSuite's unit tests by change Windows …
Ngone51 Jan 11, 2018
9ca0f6e
[SPARK-23000][TEST-HADOOP2.6] Fix Flaky test suite DataSourceWithHive…
gatorsmile Jan 11, 2018
f624850
[SPARK-19732][FOLLOW-UP] Document behavior changes made in na.fill an…
gatorsmile Jan 11, 2018
b94debd
[SPARK-22994][K8S] Use a single image for all Spark containers.
Jan 11, 2018
f891ee3
[SPARK-22908] Add kafka source and sink for continuous processing.
jose-torres Jan 11, 2018
2ec3026
[SPARK-23046][ML][SPARKR] Have RFormula include VectorSizeHint in pip…
MrBago Jan 11, 2018
964cc2e
Preparing Spark release v2.3.0-rc1
sameeragarwal Jan 11, 2018
6bb2296
Preparing development version 2.3.1-SNAPSHOT
sameeragarwal Jan 11, 2018
55695c7
[SPARK-23008][ML] OnehotEncoderEstimator python API
WeichenXu123 Jan 12, 2018
3ae3e1b
[SPARK-22986][CORE] Use a cache to avoid instantiating multiple insta…
ho3rexqj Jan 12, 2018
d512d87
[SPARK-23008][ML][FOLLOW-UP] mark OneHotEncoder python API deprecated
WeichenXu123 Jan 12, 2018
6152da3
[SPARK-23025][SQL] Support Null type in scala reflection
mgaido91 Jan 12, 2018
db27a93
[MINOR][BUILD] Fix Java linter errors
dongjoon-hyun Jan 12, 2018
02176f4
[SPARK-22975][SS] MetricsReporter should not throw exception when the…
mgaido91 Jan 12, 2018
60bcb46
Revert "[SPARK-22908] Add kafka source and sink for continuous proces…
sameeragarwal Jan 12, 2018
ca27d9c
[SPARK-22980][PYTHON][SQL] Clarify the length of each series is of ea…
HyukjinKwon Jan 13, 2018
801ffd7
[SPARK-22870][CORE] Dynamic allocation should allow 0 idle time
wangyum Jan 13, 2018
8d32ed5
[SPARK-23036][SQL][TEST] Add withGlobalTempView for testing
xubo245 Jan 13, 2018
0fc5533
[SPARK-22790][SQL] add a configurable factor to describe HadoopFsRela…
CodingCat Jan 13, 2018
bcd87ae
[SPARK-21213][SQL][FOLLOWUP] Use compatible types for comparisons in …
maropu Jan 13, 2018
1f4a08b
[SPARK-23063][K8S] K8s changes for publishing scripts (and a couple o…
foxish Jan 14, 2018
a335a49
[SPARK-23038][TEST] Update docker/spark-test (JDK/OS)
dongjoon-hyun Jan 14, 2018
0d425c3
[SPARK-23069][DOCS][SPARKR] fix R doc for describe missing text
felixcheung Jan 14, 2018
5fbbd94
[SPARK-23021][SQL] AnalysisBarrier should override innerChildren to p…
maropu Jan 14, 2018
9051e1a
[SPARK-23051][CORE] Fix for broken job description in Spark UI
smurakozi Jan 14, 2018
2879236
[SPARK-22999][SQL] show databases like command' can remove the like k…
Jan 14, 2018
30574fd
[SPARK-23054][SQL] Fix incorrect results of casting UserDefinedType t…
maropu Jan 15, 2018
81b9899
[SPARK-23049][SQL] `spark.sql.files.ignoreCorruptFiles` should work f…
dongjoon-hyun Jan 15, 2018
188999a
[SPARK-23023][SQL] Cast field data to strings in showString
maropu Jan 15, 2018
3491ca4
[SPARK-19550][BUILD][FOLLOW-UP] Remove MaxPermSize for sql module
wangyum Jan 15, 2018
c6a3b92
[SPARK-23070] Bump previousSparkVersion in MimaBuild.scala to be 2.2.0
gatorsmile Jan 15, 2018
706a308
[SPARK-23035][SQL] Fix improper information of TempTableAlreadyExists…
xubo245 Jan 15, 2018
bb8e5ad
[SPARK-23080][SQL] Improve error message for built-in functions
mgaido91 Jan 16, 2018
e2ffb97
[SPARK-23000] Use fully qualified table names in HiveMetastoreCatalog…
sameeragarwal Jan 16, 2018
e58c4a9
[SPARK-22956][SS] Bug fix for 2 streams union failover scenario
xuanyuanking Jan 16, 2018
20c6981
[SPARK-23020][CORE] Fix races in launcher code, test.
Jan 16, 2018
5c06ee2
[SPARK-22978][PYSPARK] Register Vectorized UDFs for SQL Statement
gatorsmile Jan 16, 2018
863ffdc
[SPARK-22392][SQL] data source v2 columnar batch reader
cloud-fan Jan 16, 2018
833a584
[SPARK-23045][ML][SPARKR] Update RFormula to use OneHotEncoderEstimator.
MrBago Jan 16, 2018
41d1a32
[SPARK-23095][SQL] Decorrelation of scalar subquery fails with java.u…
dilipbiswal Jan 17, 2018
08252bb
[SPARK-22361][SQL][TEST] Add unit test for Window Frames
gaborgsomogyi Jan 17, 2018
0a441d2
[SPARK-22908][SS] Roll forward continuous processing Kafka support wi…
jose-torres Jan 17, 2018
b9339ee
Revert "[SPARK-23020][CORE] Fix races in launcher code, test."
sameeragarwal Jan 17, 2018
00c744e
Fix merge between 07ae39d0ec and 1667057851
jose-torres Jan 17, 2018
8ef323c
[SPARK-23072][SQL][TEST] Add a Unicode schema test for file-based dat…
dongjoon-hyun Jan 17, 2018
bfbc2d4
[SPARK-23062][SQL] Improve EXCEPT documentation
Jan 17, 2018
cbb6bda
[SPARK-21783][SQL] Turn on ORC filter push-down by default
dongjoon-hyun Jan 17, 2018
aae73a2
[SPARK-23079][SQL] Fix query constraints propagation with aliases
gengliangwang Jan 17, 2018
1a6dfaf
[SPARK-23020] Ignore Flaky Test: SparkLauncherSuite.testInProcessLaun…
sameeragarwal Jan 17, 2018
dbd2a55
[SPARK-23033][SS] Don't use task level retry for continuous processing
jose-torres Jan 17, 2018
79ccd0c
[SPARK-23093][SS] Don't change run id when reconfiguring a continuous…
jose-torres Jan 17, 2018
6e509fd
[SPARK-23047][PYTHON][SQL] Change MapVector to NullableMapVector in A…
icexelloss Jan 17, 2018
b84c2a3
[SPARK-23132][PYTHON][ML] Run doctests in ml.image when testing
HyukjinKwon Jan 17, 2018
9783aea
[SPARK-23119][SS] Minor fixes to V2 streaming APIs
tdas Jan 18, 2018
050c1e2
[SPARK-23064][DOCS][SS] Added documentation for stream-stream joins
tdas Jan 18, 2018
f2688ef
[SPARK-21996][SQL] read files with space in name for streaming
xysun Jan 18, 2018
3a80cc5
[SPARK-23122][PYTHON][SQL] Deprecate register* for UDFs in SQLContext…
HyukjinKwon Jan 18, 2018
2a87c3a
[SPARK-23052][SS] Migrate ConsoleSink to data source V2 api.
jose-torres Jan 18, 2018
f801ac4
[SPARK-23140][SQL] Add DataSourceV2Strategy to Hive Session state's p…
jerryshao Jan 18, 2018
8a98274
[SPARK-22036][SQL] Decimal multiplication with high precision/scale o…
mgaido91 Jan 18, 2018
e0421c6
[SPARK-23141][SQL][PYSPARK] Support data type string as a returnType …
ueshin Jan 18, 2018
bd0a162
[SPARK-23147][UI] Fix task page table IndexOutOfBound Exception
jerryshao Jan 18, 2018
bfdbdd3
[SPARK-23029][DOCS] Specifying default units of configuration entries
ferdonline Jan 18, 2018
e6e8bbe
[SPARK-23143][SS][PYTHON] Added python API for setting continuous tri…
tdas Jan 18, 2018
1f88fcd
[SPARK-23144][SS] Added console sink for continuous processing
tdas Jan 18, 2018
b8c6d93
[SPARK-23133][K8S] Fix passing java options to Executor
andrusha Jan 18, 2018
a295034
[SPARK-23094] Fix invalid character handling in JsonDataSource
brkyvz Jan 18, 2018
7057e31
[SPARK-22962][K8S] Fail fast if submission client local files are used
liyinan926 Jan 18, 2018
acf3b70
[SPARK-23142][SS][DOCS] Added docs for continuous processing
tdas Jan 19, 2018
225b1af
[DOCS] change to dataset for java code in structured-streaming-kafka-…
brandonJY Jan 19, 2018
541dbc0
[SPARK-23054][SQL][PYSPARK][FOLLOWUP] Use sqlType casting when castin…
ueshin Jan 19, 2018
54c1fae
[BUILD][MINOR] Fix java style check issues
sameeragarwal Jan 19, 2018
e582231
[SPARK-23127][DOC] Update FeatureHasher guide for categoricalCols par…
Jan 19, 2018
ef7989d
[SPARK-23048][ML] Add OneHotEncoderEstimator document and examples
viirya Jan 19, 2018
b7a8199
[SPARK-23089][STS] Recreate session log directory if it doesn't exist
mgaido91 Jan 19, 2018
8d6845c
[SPARK-23000][TEST] Keep Derby DB Location Unchanged After Session Cl…
gatorsmile Jan 19, 2018
55efeff
[SPARK-23149][SQL] polish ColumnarBatch
cloud-fan Jan 19, 2018
ffe4591
[SPARK-23104][K8S][DOCS] Changes to Kubernetes scheduler documentation
foxish Jan 19, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 2.3.0
Version: 2.3.1
Title: R Frontend for Apache Spark
Description: Provides an R Frontend for Apache Spark.
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
Expand Down
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"withWatermark",
"write.df",
"write.jdbc",
"write.json",
Expand Down
102 changes: 95 additions & 7 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,7 @@ setMethod("intersect",
#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT} in SQL.
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT DISTINCT} in SQL.
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
Expand Down Expand Up @@ -3054,10 +3054,10 @@ setMethod("describe",
#' \item stddev
#' \item min
#' \item max
#' \item arbitrary approximate percentiles specified as a percentage (eg, "75%")
#' \item arbitrary approximate percentiles specified as a percentage (eg, "75\%")
#' }
#' If no statistics are given, this function computes count, mean, stddev, min,
#' approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
#' approximate quartiles (percentiles at 25\%, 50\%, and 75\%), and max.
#' This function is meant for exploratory data analysis, as we make no guarantee about the
#' backward compatibility of the schema of the resulting Dataset. If you want to
#' programmatically compute summary statistics, use the \code{agg} function instead.
Expand Down Expand Up @@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
#' isStreaming
#'
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
#' as it arrives.
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
#' \code{StreamingQuery} using \code{write.stream}.
#'
#' @param x A SparkDataFrame
#' @return TRUE if this SparkDataFrame is from a streaming source
Expand Down Expand Up @@ -3707,7 +3708,17 @@ setMethod("isStreaming",
#' @param df a streaming SparkDataFrame.
#' @param source a name for external data source.
#' @param outputMode one of 'append', 'complete', 'update'.
#' @param ... additional argument(s) passed to the method.
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
#' system. If specified, the output is laid out on the file system similar to Hive's
#' partitioning scheme.
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
#' default. Only one trigger can be set.
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
#' set.
#' @param ... additional external data source specific named options.
#'
#' @family SparkDataFrame functions
#' @seealso \link{read.stream}
Expand All @@ -3725,7 +3736,8 @@ setMethod("isStreaming",
#' # console
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
#' # text stream
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
#' # memory stream
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
#' head(sql("SELECT * from outs"))
Expand All @@ -3737,7 +3749,8 @@ setMethod("isStreaming",
#' @note experimental
setMethod("write.stream",
signature(df = "SparkDataFrame"),
function(df, source = NULL, outputMode = NULL, ...) {
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
trigger.processingTime = NULL, trigger.once = NULL, ...) {
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the data source specified ",
"in 'spark.sql.sources.default' configuration by default.")
Expand All @@ -3748,12 +3761,43 @@ setMethod("write.stream",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
cols <- NULL
if (!is.null(partitionBy)) {
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
stop("All partitionBy column names should be characters.")
}
cols <- as.list(partitionBy)
}
jtrigger <- NULL
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
if (!is.null(trigger.once)) {
stop("Multiple triggers not allowed.")
}
interval <- as.character(trigger.processingTime)
if (nchar(interval) == 0) {
stop("Value for trigger.processingTime must be a non-empty string.")
}
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
"ProcessingTime",
interval)
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
if (!is.logical(trigger.once) || !trigger.once) {
stop("Value for trigger.once must be TRUE.")
}
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
}
options <- varargsToStrEnv(...)
write <- handledCallJMethod(df@sdf, "writeStream")
write <- callJMethod(write, "format", source)
if (!is.null(outputMode)) {
write <- callJMethod(write, "outputMode", outputMode)
}
if (!is.null(cols)) {
write <- callJMethod(write, "partitionBy", cols)
}
if (!is.null(jtrigger)) {
write <- callJMethod(write, "trigger", jtrigger)
}
write <- callJMethod(write, "options", options)
ssq <- handledCallJMethod(write, "start")
streamingQuery(ssq)
Expand Down Expand Up @@ -3967,3 +4011,47 @@ setMethod("broadcast",
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
dataFrame(sdf)
})

#' withWatermark
#'
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
#' time before which we assume no more late data is going to arrive.
#'
#' Spark will use this watermark for several purposes:
#' \itemize{
#' \item To know when a given time window aggregation can be finalized and thus can be emitted
#' when using output modes that do not allow updates.
#' \item To minimize the amount of state that we need to keep for on-going aggregations.
#' }
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
#' process records that arrive more than \code{delayThreshold} late.
#'
#' @param x a streaming SparkDataFrame
#' @param eventTime a string specifying the name of the Column that contains the event time of the
#' row.
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
#' relative to the latest record that has been processed in the form of an
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
#' @return a SparkDataFrame.
#' @aliases withWatermark,SparkDataFrame,character,character-method
#' @family SparkDataFrame functions
#' @rdname withWatermark
#' @name withWatermark
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
#' df <- withWatermark(df, "time", "10 minutes")
#' }
#' @note withWatermark since 2.3.0
setMethod("withWatermark",
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
function(x, eventTime, delayThreshold) {
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
dataFrame(sdf)
})
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
#' required for file-based streaming data source
#' @param ... additional external data source specific named options, for instance \code{path} for
#' file-based streaming data source
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
#' uses the default value, session local timezone.
#' @return SparkDataFrame
#' @rdname read.stream
#' @name read.stream
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })

#' @rdname withWatermark
#' @export
setGeneric("withWatermark", function(x, eventTime, delayThreshold) {
standardGeneric("withWatermark")
})

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/mllib_recommendation.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ setClass("ALSModel", representation(jobj = "jobj"))
#' @param numUserBlocks number of user blocks used to parallelize computation (> 0).
#' @param numItemBlocks number of item blocks used to parallelize computation (> 0).
#' @param checkpointInterval number of checkpoint intervals (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param ... additional argument(s) passed to the method.
#' @return \code{spark.als} returns a fitted ALS model.
#' @rdname spark.als
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/mllib_tree.R
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ print.summary.decisionTree <- function(x) {
#' >= 1.
#' @param minInfoGain Minimum information gain for a split to be considered at a tree node.
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
Expand Down Expand Up @@ -382,6 +384,8 @@ setMethod("write.ml", signature(object = "GBTClassificationModel", path = "chara
#' @param minInstancesPerNode Minimum number of instances each child must have after split.
#' @param minInfoGain Minimum information gain for a split to be considered at a tree node.
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
Expand Down Expand Up @@ -595,6 +599,8 @@ setMethod("write.ml", signature(object = "RandomForestClassificationModel", path
#' @param minInstancesPerNode Minimum number of instances each child must have after split.
#' @param minInfoGain Minimum information gain for a split to be considered at a tree node.
#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1).
#' Note: this setting will be ignored if the checkpoint directory is not
#' set.
#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation.
#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with
#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching
Expand Down
107 changes: 107 additions & 0 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,113 @@ test_that("Terminated by error", {
stopQuery(q)
})

test_that("PartitionBy", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
checkpointPath <- tempfile(pattern = "sparkr-test", fileext = ".checkpoint")
textPath <- tempfile(pattern = "sparkr-test", fileext = ".text")
df <- read.df(jsonPath, "json", stringSchema)
write.df(df, parquetPath, "parquet", "overwrite")

df <- read.stream(path = parquetPath, schema = stringSchema)

expect_error(write.stream(df, "json", path = textPath, checkpointLocation = "append",
partitionBy = c(1, 2)),
"All partitionBy column names should be characters")

q <- write.stream(df, "json", path = textPath, checkpointLocation = "append",
partitionBy = "name")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

dirs <- list.files(textPath)
expect_equal(length(dirs[substring(dirs, 1, nchar("name=")) == "name="]), 3)

unlink(checkpointPath)
unlink(textPath)
unlink(parquetPath)
})

test_that("Watermark", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
schema <- structType(structField("value", "string"))
t <- Sys.time()
df <- as.DataFrame(lapply(list(t), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
df <- read.stream(path = parquetPath, schema = "value STRING")
df <- withColumn(df, "eventTime", cast(df$value, "timestamp"))
df <- withWatermark(df, "eventTime", "10 seconds")
counts <- count(group_by(df, "eventTime"))
q <- write.stream(counts, "memory", queryName = "times", outputMode = "append")

# first events
df <- as.DataFrame(lapply(list(t + 1, t, t + 2), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# advance watermark to 15
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# old events, should be dropped
df <- as.DataFrame(lapply(list(t), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# evict events less than previous watermark
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

times <- collect(sql("SELECT * FROM times"))
# looks like write timing can affect the first bucket; but it should be t
expect_equal(times[order(times$eventTime),][1, 2], 2)

stopQuery(q)
unlink(parquetPath)
})

test_that("Trigger", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
schema <- structType(structField("value", "string"))
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
df <- read.stream(path = parquetPath, schema = "value STRING")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = "", trigger.once = ""), "Multiple triggers not allowed.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = ""),
"Value for trigger.processingTime must be a non-empty string.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = "invalid"), "illegal argument")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.once = ""), "Value for trigger.once must be TRUE.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.once = FALSE), "Value for trigger.once must be TRUE.")

q <- write.stream(df, "memory", queryName = "times", outputMode = "append", trigger.once = TRUE)
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

expect_equal(nrow(collect(sql("SELECT * FROM times"))), 1)

stopQuery(q)
unlink(parquetPath)
})

unlink(jsonPath)
unlink(jsonPathNa)

Expand Down
2 changes: 1 addition & 1 deletion R/pkg/vignettes/sparkr-vignettes.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -1042,7 +1042,7 @@ unlink(modelPath)

## Structured Streaming

SparkR supports the Structured Streaming API (experimental).
SparkR supports the Structured Streaming API.

You can check the Structured Streaming Programming Guide for [an introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model) to its programming model and basic concepts.

Expand Down
2 changes: 1 addition & 1 deletion assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<version>2.3.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Loading