Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 4, 2018
1 parent 8d16109 commit 0bbff56
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class MicroBatchExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {

private def toJava(
scalaOption: Option[v2.streaming.reader.Offset]): Optional[v2.streaming.reader.Offset] = {
Optional.ofNullable(scalaOption.orNull)
}

@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty

private val triggerExecutor = trigger match {
Expand Down Expand Up @@ -269,17 +274,12 @@ class MicroBatchExecution(
case s: MicroBatchReader =>
updateStatusMessage(s"Getting offsets from $s")
reportTimeTaken("getOffset") {
// Once v1 streaming source execution is gone, we can restructure this to be cleaner.
// Once v1 streaming source execution is gone, we can refactor this away.
// For now, we set the range here to get the source to infer the available end offset,
// get that offset, and then set the range again when we later execute.
if (availableOffsets.get(s).isDefined) {
val offsetJson = availableOffsets.get(s).get.json
s.setOffsetRange(
Optional.of(s.deserializeOffset(offsetJson)),
Optional.empty())
} else {
s.setOffsetRange(Optional.empty(), Optional.empty())
}
s.setOffsetRange(
toJava(availableOffsets.get(s).map(off => s.deserializeOffset(off.json))),
Optional.empty())

(s, Some(s.getEndOffset))
}
Expand Down Expand Up @@ -405,7 +405,7 @@ class MicroBatchExecution(
if committedOffsets.get(reader).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json))
reader.setOffsetRange(
Optional.ofNullable(current.orNull),
toJava(current),
Optional.of(available.asInstanceOf[v2.streaming.reader.Offset]))
logDebug(s"Retrieving data from $reader: $current -> $available")
Some(reader ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.streaming.continuous.ContinuousRateStreamReader
import org.apache.spark.sql.execution.streaming.sources.RateStreamV2Reader
import org.apache.spark.sql.execution.streaming.sources.MicroBatchRateStreamReader
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
Expand Down Expand Up @@ -245,7 +245,7 @@ class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): MicroBatchReader = {
new RateStreamV2Reader(options)
new MicroBatchRateStreamReader(options)
}

override def shortName(): String = "ratev2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offse
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
import org.apache.spark.util.{ManualClock, SystemClock}

class RateStreamV2Reader(options: DataSourceV2Options)
class MicroBatchRateStreamReader(options: DataSourceV2Options)
extends MicroBatchReader {
implicit val defaultFormats: DefaultFormats = DefaultFormats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamSourceV2, RateStreamV2Reader}
import org.apache.spark.sql.execution.streaming.sources.{MicroBatchRateStreamReader, RateStreamBatchTask, RateStreamSourceV2}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.streaming.StreamTest
Expand All @@ -38,7 +38,7 @@ class RateSourceV2Suite extends StreamTest {
override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = {
assert(query.nonEmpty)
val rateSource = query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: RateStreamV2Reader, _) => source
case StreamingExecutionRelation(source: MicroBatchRateStreamReader, _) => source
}.head
rateSource.clock.asInstanceOf[ManualClock].advance(TimeUnit.SECONDS.toMillis(seconds))
rateSource.setOffsetRange(Optional.empty(), Optional.empty())
Expand All @@ -50,7 +50,7 @@ class RateSourceV2Suite extends StreamTest {
DataSource.lookupDataSource("ratev2", spark.sqlContext.conf).newInstance() match {
case ds: MicroBatchReadSupport =>
val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty())
assert(reader.isInstanceOf[RateStreamV2Reader])
assert(reader.isInstanceOf[MicroBatchRateStreamReader])
case _ =>
throw new IllegalStateException("Could not find v2 read support for rate")
}
Expand All @@ -75,15 +75,15 @@ class RateSourceV2Suite extends StreamTest {
}

test("microbatch - numPartitions propagated") {
val reader = new RateStreamV2Reader(
val reader = new MicroBatchRateStreamReader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
reader.setOffsetRange(Optional.empty(), Optional.empty())
val tasks = reader.createReadTasks()
assert(tasks.size == 11)
}

test("microbatch - set offset") {
val reader = new RateStreamV2Reader(DataSourceV2Options.empty())
val reader = new MicroBatchRateStreamReader(DataSourceV2Options.empty())
val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 2000))))
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
Expand All @@ -92,7 +92,7 @@ class RateSourceV2Suite extends StreamTest {
}

test("microbatch - infer offsets") {
val reader = new RateStreamV2Reader(
val reader = new MicroBatchRateStreamReader(
new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "100").asJava))
reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
reader.setOffsetRange(Optional.empty(), Optional.empty())
Expand All @@ -113,7 +113,7 @@ class RateSourceV2Suite extends StreamTest {
}

test("microbatch - predetermined batch size") {
val reader = new RateStreamV2Reader(
val reader = new MicroBatchRateStreamReader(
new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava))
val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(20, 2000))))
Expand All @@ -124,7 +124,7 @@ class RateSourceV2Suite extends StreamTest {
}

test("microbatch - data read") {
val reader = new RateStreamV2Reader(
val reader = new MicroBatchRateStreamReader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
val startOffset = RateStreamSourceV2.createInitialOffset(11, reader.creationTimeMs)
val endOffset = RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map {
Expand Down

0 comments on commit 0bbff56

Please sign in to comment.