Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jan 31, 2018
1 parent e609060 commit 1d90cf1
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, Par
* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])
extends OffsetV2 {
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{streaming, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
import org.apache.spark.sql.sources.v2.reader.MicroBatchReadSupport
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
Expand Down Expand Up @@ -403,7 +403,7 @@ class MicroBatchExecution(
val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json))
reader.setOffsetRange(
toJava(current),
Optional.of(available.asInstanceOf[streaming.Offset]))
Optional.of(available.asInstanceOf[OffsetV2]))
logDebug(s"Retrieving data from $reader: $current -> $available")
Some(reader ->
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
Expand Down Expand Up @@ -492,7 +492,7 @@ class MicroBatchExecution(
}
}

private def toJava(scalaOption: Option[streaming.Offset]): Optional[streaming.Offset] = {
private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = {
Optional.ofNullable(scalaOption.orNull)
}
}

0 comments on commit 1d90cf1

Please sign in to comment.