Skip to content

Commit

Permalink
[SPARK-28199][SS] Move Trigger implementations to Triggers.scala and …
Browse files Browse the repository at this point in the history
…avoid exposing these to the end users

## What changes were proposed in this pull request?

This patch proposes moving all Trigger implementations to `Triggers.scala`, to avoid exposing these implementations to the end users and let end users only deal with `Trigger.xxx` static methods. This fits the intention of deprecation of `ProcessingTIme`, and we agree to move others without deprecation as this patch will be shipped in major version (Spark 3.0.0).

## How was this patch tested?

UTs modified to work with newly introduced class.

Closes #24996 from HeartSaVioR/SPARK-28199.

Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
  • Loading branch information
HeartSaVioR authored and srowen committed Jul 14, 2019
1 parent 76079fa commit 7548a88
Show file tree
Hide file tree
Showing 14 changed files with 114 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.ContinuousTrigger
import org.apache.spark.sql.streaming.Trigger

// Run tests in KafkaSourceSuiteBase in continuous execution mode.
Expand Down
6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ object MimaExcludes {

// [SPARK-26616][MLlib] Expose document frequency in IDFModel
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf"),

// [SPARK-28199][SS] Remove deprecated ProcessingTime
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$")
)

// Exclude rules for 2.4.x
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import java.util.concurrent.TimeUnit;

import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
import scala.concurrent.duration.Duration;

import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;

/**
Expand All @@ -40,7 +41,7 @@ public class Trigger {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long intervalMs) {
return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -56,7 +57,7 @@ public static Trigger ProcessingTime(long intervalMs) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
return ProcessingTime.create(interval, timeUnit);
return ProcessingTimeTrigger.create(interval, timeUnit);
}

/**
Expand All @@ -71,7 +72,7 @@ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(Duration interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand All @@ -84,7 +85,7 @@ public static Trigger ProcessingTime(Duration interval) {
* @since 2.2.0
*/
public static Trigger ProcessingTime(String interval) {
return ProcessingTime.apply(interval);
return ProcessingTimeTrigger.apply(interval);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchSt
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock

class MicroBatchExecution(
Expand All @@ -51,7 +51,7 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[SparkDataStream] = Seq.empty

private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, SystemClock}

trait TriggerExecutor {
Expand All @@ -43,10 +42,12 @@ case class OneTimeExecutor() extends TriggerExecutor {
/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/
case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock())
case class ProcessingTimeExecutor(
processingTimeTrigger: ProcessingTimeTrigger,
clock: Clock = new SystemClock())
extends TriggerExecutor with Logging {

private val intervalMs = processingTime.intervalMs
private val intervalMs = processingTimeTrigger.intervalMs
require(intervalMs >= 0)

override def execute(triggerHandler: () => Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,94 @@

package org.apache.spark.sql.execution.streaming

import java.util.concurrent.TimeUnit

import scala.concurrent.duration.Duration

import org.apache.spark.annotation.{Evolving, Experimental}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.unsafe.types.CalendarInterval

private object Triggers {
def validate(intervalMs: Long): Unit = {
require(intervalMs >= 0, "the interval of trigger should not be negative")
}

def convert(interval: String): Long = {
val cal = CalendarInterval.fromCaseInsensitiveString(interval)
if (cal.months > 0) {
throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval")
}
TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
}

def convert(interval: Duration): Long = interval.toMillis

def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
}

/**
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
* the query.
*/
@Experimental

This comment has been minimized.

Copy link
@zsxwing

zsxwing Jul 19, 2019

Member

@HeartSaVioR could you also remove these unnecessary annotations since these APIs are private?

This comment has been minimized.

Copy link
@HeartSaVioR

HeartSaVioR Jul 19, 2019

Author Contributor

Thanks for pointing out! Will raise follow-up PR soon.

This comment has been minimized.

Copy link
@srowen

srowen Jul 19, 2019

Member

(There may be more like this throughout the code)

@Evolving
case object OneTimeTrigger extends Trigger
private[sql] case object OneTimeTrigger extends Trigger

/**
* A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0,
* the query will run as fast as possible.
*/
@Evolving
private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
Triggers.validate(intervalMs)
}

private[sql] object ProcessingTimeTrigger {
import Triggers._

def apply(interval: String): ProcessingTimeTrigger = {
ProcessingTimeTrigger(convert(interval))
}

def apply(interval: Duration): ProcessingTimeTrigger = {
ProcessingTimeTrigger(convert(interval))
}

def create(interval: String): ProcessingTimeTrigger = {
apply(interval)
}

def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
ProcessingTimeTrigger(convert(interval, unit))
}
}

/**
* A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at
* the specified interval.
*/
@Evolving
private[sql] case class ContinuousTrigger(intervalMs: Long) extends Trigger {
Triggers.validate(intervalMs)
}

private[sql] object ContinuousTrigger {
import Triggers._

def apply(interval: String): ContinuousTrigger = {
ContinuousTrigger(convert(interval))
}

def apply(interval: Duration): ContinuousTrigger = {
ContinuousTrigger(convert(interval))
}

def create(interval: String): ContinuousTrigger = {
apply(interval)
}

def create(interval: Long, unit: TimeUnit): ContinuousTrigger = {
ContinuousTrigger(convert(interval, unit))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock

class ContinuousExecution(
Expand Down Expand Up @@ -93,7 +93,7 @@ class ContinuousExecution(
}

private val triggerExecutor = trigger match {
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock)
case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTimeTrigger(t), triggerClock)
case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger")
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider}
import org.apache.spark.sql.sources.v2.TableCapability._
Expand Down
Loading

0 comments on commit 7548a88

Please sign in to comment.