From 1d90cf11710e452c5b2adf819da8003c8f96c6e1 Mon Sep 17 00:00:00 2001 From: Wang Gengliang Date: Wed, 31 Jan 2018 22:22:26 +0800 Subject: [PATCH] Address comments --- .../org/apache/spark/sql/kafka010/KafkaSourceOffset.scala | 3 +-- .../sql/execution/streaming/MicroBatchExecution.scala | 8 ++++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index 5b072eef43da2..8d41c0da2b133 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -27,8 +27,7 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, Par * their offsets. */ private[kafka010] -case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) - extends OffsetV2 { +case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 { override val json = JsonUtils.partitionOffsets(partitionToOffsets) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 91c03e4a87e63..d9aa8573ba930 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -30,8 +30,8 @@ import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2} import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter} import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.sources.v2.reader.{streaming, MicroBatchReadSupport} -import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader +import org.apache.spark.sql.sources.v2.reader.MicroBatchReadSupport +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2} import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} import org.apache.spark.util.{Clock, Utils} @@ -403,7 +403,7 @@ class MicroBatchExecution( val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json)) reader.setOffsetRange( toJava(current), - Optional.of(available.asInstanceOf[streaming.Offset])) + Optional.of(available.asInstanceOf[OffsetV2])) logDebug(s"Retrieving data from $reader: $current -> $available") Some(reader -> new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader)) @@ -492,7 +492,7 @@ class MicroBatchExecution( } } - private def toJava(scalaOption: Option[streaming.Offset]): Optional[streaming.Offset] = { + private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = { Optional.ofNullable(scalaOption.orNull) } }