Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Jan 30, 2018
1 parent d1c4fcb commit c7c0a1d
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import java.util.concurrent.TimeoutException

import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetOutOfRangeException}
import org.apache.kafka.common.TopicPartition
import org.apache.spark.TaskContext

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
Expand All @@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.reader.streaming
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
import org.apache.spark.sql.sources.v2.streaming.reader.PartitionOffset

/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends streaming.Offset {
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])
extends streaming.Offset {

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWr
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{streaming, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization

import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.reader.streaming.Offset

case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, ValueRunTimeMsPair])
extends Offset {
extends v2.reader.streaming.Offset {
implicit val defaultFormats: DefaultFormats = DefaultFormats
override val json = Serialization.write(partitionToValueAndRunTimeMs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2
import java.util.{ArrayList, List => JList}

import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
Expand Down

0 comments on commit c7c0a1d

Please sign in to comment.