Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jan 31, 2018
1 parent 0b6b59e commit e609060
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.reader.streaming
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset}

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])
extends streaming.Offset {
extends OffsetV2 {

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, _}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
Expand Down

0 comments on commit e609060

Please sign in to comment.