Skip to content

Commit

Permalink
[SPARK-23143][SS][PYTHON] Added python API for setting continuous tri…
Browse files Browse the repository at this point in the history
…gger

## What changes were proposed in this pull request?
Self-explanatory.

## How was this patch tested?
New python tests.

Author: Tathagata Das <[email protected]>

Closes #20309 from tdas/SPARK-23143.
  • Loading branch information
tdas committed Jan 18, 2018
1 parent 9678941 commit 2d41f04
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 4 deletions.
23 changes: 19 additions & 4 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ def queryName(self, queryName):

@keyword_only
@since(2.0)
def trigger(self, processingTime=None, once=None):
def trigger(self, processingTime=None, once=None, continuous=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
Expand All @@ -802,23 +802,38 @@ def trigger(self, processingTime=None, once=None):
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(continuous='5 seconds')
"""
params = [processingTime, once, continuous]

if params.count(None) == 3:
raise ValueError('No trigger provided')
elif params.count(None) < 2:
raise ValueError('Multiple triggers not allowed.')

jTrigger = None
if processingTime is not None:
if once is not None:
raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0:
raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime)
interval = processingTime.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
interval)

elif once is not None:
if once is not True:
raise ValueError('Value for once must be True. Got: %s' % once)
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()

else:
raise ValueError('No trigger provided')
if type(continuous) != str or len(continuous.strip()) == 0:
raise ValueError('Value for continuous must be a non empty string. Got: %s' %
continuous)
interval = continuous.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
interval)

self._jwrite = self._jwrite.trigger(jTrigger)
return self

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,12 @@ def test_stream_trigger(self):
except ValueError:
pass

# Should not take multiple args
try:
df.writeStream.trigger(processingTime='5 seconds', continuous='1 second')
except ValueError:
pass

# Should take only keyword args
try:
df.writeStream.trigger('5 seconds')
Expand Down

0 comments on commit 2d41f04

Please sign in to comment.