Skip to content

Commit

Permalink
move ratev2 register
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 4, 2018
1 parent 0bbff56 commit 2867880
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
org.apache.spark.sql.execution.streaming.RateSourceProviderV2
org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,6 @@ class RateStreamSource(
s"rampUpTimeSeconds=$rampUpTimeSeconds, numPartitions=$numPartitions]"
}

/**
* This is a temporary register as we build out v2 migration. Microbatch read support should
* be implemented in the same register as v1.
*/
class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with DataSourceRegister {
override def createMicroBatchReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): MicroBatchReader = {
new MicroBatchRateStreamReader(options)
}

override def shortName(): String = "ratev2"
}

object RateStreamSource {

/** Calculate the end value we will emit at the time `seconds`. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,29 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
import org.apache.spark.util.{ManualClock, SystemClock}

/**
* This is a temporary register as we build out v2 migration. Microbatch read support should
* be implemented in the same register as v1.
*/
class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with DataSourceRegister {
override def createMicroBatchReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): MicroBatchReader = {
new MicroBatchRateStreamReader(options)
}

override def shortName(): String = "ratev2"
}

class MicroBatchRateStreamReader(options: DataSourceV2Options)
extends MicroBatchReader {
implicit val defaultFormats: DefaultFormats = DefaultFormats
Expand Down

0 comments on commit 2867880

Please sign in to comment.